From bef1cdea167aa972021897250b0f05d3f41212de Mon Sep 17 00:00:00 2001 From: Anton Parkhomenko Date: Thu, 29 Oct 2020 01:48:23 +0300 Subject: [PATCH] RDB Loader: drop legacy S3 paths (close #198) --- .../rdbloader/discovery/DataDiscovery.scala | 7 +- .../rdbloader/discovery/ShreddedType.scala | 50 +--- .../discovery/DataDiscoverySpec.scala | 237 +----------------- .../discovery/ShreddedTypeSpec.scala | 81 +----- 4 files changed, 35 insertions(+), 340 deletions(-) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala index ed3124536..e654ebda2 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala @@ -14,10 +14,12 @@ package com.snowplowanalytics.snowplow.rdbloader package discovery import scala.concurrent.duration._ + import cats._ import cats.data._ import cats.implicits._ import cats.effect.Timer + import com.snowplowanalytics.snowplow.rdbloader.config.Semver import com.snowplowanalytics.snowplow.rdbloader.LoaderError._ import com.snowplowanalytics.snowplow.rdbloader.dsl.{AWS, Cache, Logging} @@ -411,11 +413,6 @@ object DataDiscovery { /** Shredded key that doesn't need a JSONPath file and can be mapped to final */ private case class ShreddedDataKeyTabular(key: S3.Key, info: ShreddedType.Info) extends DataKeyIntermediate { - def base: S3.Folder = { - val atomicEvents = S3.Key.getParent(key) - S3.Folder.getParent(atomicEvents) - } - def toFinal: ShreddedDataKeyFinal = ShreddedDataKeyFinal(key, ShreddedType.Tabular(info)) } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala index aee5021f9..978e0e6fe 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala @@ -43,13 +43,8 @@ object ShreddedType { * @param jsonPaths existing JSONPaths file */ case class Json(info: Info, jsonPaths: S3.Key) extends ShreddedType { - def getLoadPath: String = { - if (info.shredJob <= ShredJobBeforeSparkVersion) { - s"${info.base}${info.vendor}/${info.name}/jsonschema/${info.model}-" - } else { - s"${info.base}shredded-types/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}-" - } - } + def getLoadPath: String = + s"${info.base}shredded-types/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}-" def show: String = s"${info.toCriterion.asString} ($jsonPaths)" } @@ -108,16 +103,8 @@ object ShreddedType { """/format=(?[a-zA-Z0-9-_]+)""" + """/version=(?[1-9][0-9]*)$""").r - /** Version of legacy Shred job, where old path pattern was used `com.acme/event/jsonschema/1-0-0` */ - val ShredJobBeforeSparkVersion = Semver(0,11,0) - /** Version of legacy Shred job, where TSV output was not possible */ - val ShredJobBeforeTabularVersion = Semver(0,15,0) // TODO: is it - - /** - * vendor + name + format + version + filename - */ - private val MinShreddedPathLengthLegacy = 5 + val ShredJobBeforeTabularVersion = Semver(0,15,0) /** * "shredded-types" + vendor + name + format + version + filename @@ -224,7 +211,7 @@ object ShreddedType { */ def transformPath(key: S3.Key, shredJob: Semver): Either[DiscoveryFailure, (Boolean, Info)] = { val (bucket, path) = S3.splitS3Key(key) - val (subpath, shredpath) = splitFilpath(path, shredJob) + val (subpath, shredpath) = splitFilpath(path) extractSchemaKey(shredpath, shredJob) match { case Some(Extracted.Legacy(SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)))) => val prefix = S3.Folder.coerce("s3://" + bucket + "/" + subpath) @@ -247,20 +234,16 @@ object ShreddedType { /** * Extract `SchemaKey` from subpath, which can be - * legacy-style (pre-0.12.0) com.acme/schema-name/jsonschema/1-0-0 or - * modern-style (post-0.12.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1-0-0 - * tsv-style (port-0.16.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1 + * json-style (post-0.12.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1-0-0 + * tsv-style (post-0.16.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1 * This function transforms any of above valid paths to `SchemaKey` * * @param subpath S3 subpath of four `SchemaKey` elements * @param shredJob shred job version to decide what format should be present * @return valid schema key if found */ - def extractSchemaKey(subpath: String, shredJob: Semver): Option[Extracted] = { - if (shredJob <= ShredJobBeforeSparkVersion) { - val uri = "iglu:" + subpath - SchemaKey.fromUri(uri).toOption.map(Extracted.Legacy) - } else subpath match { + def extractSchemaKey(subpath: String, shredJob: Semver): Option[Extracted] = + subpath match { case ShreddedSubpathPattern(vendor, name, format, version) => val uri = s"iglu:$vendor/$name/$format/$version" SchemaKey.fromUri(uri).toOption.map(Extracted.Legacy) @@ -271,7 +254,6 @@ object ShreddedType { } case _ => None } - } /** * Split S3 filepath (without bucket name) into subpath and shreddedpath @@ -284,17 +266,9 @@ object ShreddedType { * @param path S3 key without bucket name * @return pair of subpath and shredpath */ - private def splitFilpath(path: String, shredJob: Semver): (String, String) = { - if (shredJob <= ShredJobBeforeSparkVersion) { - path.split("/").reverse.splitAt(MinShreddedPathLengthLegacy) match { - case (reverseSchema, reversePath) => - (reversePath.reverse.mkString("/"), reverseSchema.tail.reverse.mkString("/")) - } - } else { - path.split("/").reverse.splitAt(MinShreddedPathLengthModern) match { - case (reverseSchema, reversePath) => - (reversePath.reverse.mkString("/"), reverseSchema.tail.reverse.mkString("/")) - } + private def splitFilpath(path: String): (String, String) = + path.split("/").reverse.splitAt(MinShreddedPathLengthModern) match { + case (reverseSchema, reversePath) => + (reversePath.reverse.mkString("/"), reverseSchema.tail.reverse.mkString("/")) } - } } diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscoverySpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscoverySpec.scala index 2ed5cc758..2af8052d9 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscoverySpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscoverySpec.scala @@ -15,95 +15,26 @@ package com.snowplowanalytics.snowplow.rdbloader.discovery import java.util.UUID import cats.syntax.either._ -import org.specs2.Specification + import com.snowplowanalytics.snowplow.rdbloader.{LoaderError, TestInterpreter} -import com.snowplowanalytics.snowplow.rdbloader.utils.S3.Folder.{coerce => dir} -import com.snowplowanalytics.snowplow.rdbloader.utils.S3.Key.{coerce => s3key} import com.snowplowanalytics.snowplow.rdbloader.TestInterpreter.{AWSResults, ControlResults, Test, TestState} import com.snowplowanalytics.snowplow.rdbloader.config.Semver -import com.snowplowanalytics.snowplow.rdbloader.discovery.ShreddedType._ import com.snowplowanalytics.snowplow.rdbloader.dsl.{AWS, Cache, Logging} import com.snowplowanalytics.snowplow.rdbloader.utils.S3 +import org.specs2.Specification + class DataDiscoverySpec extends Specification { def is = s2""" - Successfully discover two run folders at once $e1 - Fail to proceed with empty target folder $e3 - Do not fail to proceed with empty shredded good folder $e4 - Successfully discover data in run folder $e5 - Successfully discover data in specific folder $e6 - Successfully discover several folders using `InSpecificFolder` (decide if desired) $e7 - listGoodBucket ignores special files $e8 - show DataDiscovery with several shredded types $e9 + Fail to proceed with empty target folder $e1 + Do not fail to proceed with empty shredded good folder $e2 + listGoodBucket ignores special files $e3 + show DataDiscovery with several shredded types $e4 """ val id = UUID.fromString("8ad6fc06-ae5c-4dfc-a14d-f2ae86755179") def e1 = { - def listS3(bucket: S3.Folder) = - List( - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-0000"), - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-0001"), - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/com.mailchimp/email_address_change/jsonschema/1-0-0/part-00001"), - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/com.mailchimp/email_address_change/jsonschema/1-0-0/part-00002"), - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/com.mailchimp/email_address_change/jsonschema/2-0-0/part-00001"), - - S3.Key.coerce(bucket + "run=2017-05-22-16-00-57/atomic-events/part-0000"), - S3.Key.coerce(bucket + "run=2017-05-22-16-00-57/atomic-events/part-0001"), - S3.Key.coerce(bucket + "run=2017-05-22-16-00-57/com.snowplowanalytics.snowplow/add_to_cart/jsonschema/1-0-0/part-00000"), - S3.Key.coerce(bucket + "run=2017-05-22-16-00-57/com.snowplowanalytics.snowplow/add_to_cart/jsonschema/1-0-0/part-00001") - ).map(k => S3.BlobObject(k, 1L)).asRight[LoaderError] - - def keyExists(k: S3.Key): Boolean = { - k.toString match { - case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json" => true - case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json" => true - case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/add_to_cart_1.json" => true - case _ => false - } - } - - implicit val control: Logging[Test] = TestInterpreter.stateControlInterpreter(ControlResults.init) - implicit val aws: AWS[Test] = TestInterpreter.stateAwsInterpreter(AWSResults.init.copy(listS3 = Test.liftWith(listS3), keyExists = keyExists)) - implicit val cache: Cache[Test] = TestInterpreter.stateCacheInterpreter - - val shreddedGood = S3.Folder.coerce("s3://runfolder-test/shredded/good/") - val discoveryTarget = DataDiscovery.Global(shreddedGood) - val (_, result) = DataDiscovery.discover[Test](discoveryTarget, Semver(0,11,0), "us-east-1", None).value.run(TestState.init).value - - val expected = List( - DataDiscovery( - dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"), - Some(2L), - Some(2L), - List( - ShreddedType.Json( - Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",2,Semver(0,11,0,None)), - s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json")), - ShreddedType.Json( - Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",1,Semver(0,11,0,None)), - s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json")) - ), - specificFolder = false - ), - - DataDiscovery( - dir("s3://runfolder-test/shredded/good/run=2017-05-22-16-00-57/"), - Some(2L), - Some(2L), - List( - ShreddedType.Json( - Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-16-00-57/"), "com.snowplowanalytics.snowplow","add_to_cart",1,Semver(0,11,0,None)), - s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/add_to_cart_1.json")) - ), - specificFolder = false - ) - ) - - result must beRight(expected) - } - - def e3 = { implicit val cache: Cache[Test] = TestInterpreter.stateCacheInterpreter implicit val control: Logging[Test] = TestInterpreter.stateControlInterpreter(ControlResults.init) @@ -119,7 +50,7 @@ class DataDiscoverySpec extends Specification { def is = s2""" result must beLeft(expected) } - def e4 = { + def e2 = { implicit val cache: Cache[Test] = TestInterpreter.stateCacheInterpreter implicit val control: Logging[Test] = TestInterpreter.stateControlInterpreter(ControlResults.init) implicit val aws: AWS[Test] = TestInterpreter.stateAwsInterpreter(AWSResults.init) @@ -135,155 +66,7 @@ class DataDiscoverySpec extends Specification { def is = s2""" result must beRight(expected) } - def e5 = { - def listS3(bucket: S3.Folder) = - List( - S3.Key.coerce(bucket + "atomic-events/part-0000"), - S3.Key.coerce(bucket + "atomic-events/part-0001"), - S3.Key.coerce(bucket + "com.mailchimp/email_address_change/jsonschema/1-0-0/part-00001"), - S3.Key.coerce(bucket + "com.mailchimp/email_address_change/jsonschema/1-0-0/part-00002"), - S3.Key.coerce(bucket + "com.mailchimp/email_address_change/jsonschema/2-0-0/part-00001") - ).map(k => S3.BlobObject(k, 1L)).asRight[LoaderError] - - def keyExists(k: S3.Key): Boolean = - k.toString match { - case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json" => true - case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json" => true - case _ => false - } - - implicit val control: Logging[Test] = TestInterpreter.stateControlInterpreter(ControlResults.init) - implicit val aws: AWS[Test] = TestInterpreter.stateAwsInterpreter(AWSResults.init.copy(listS3 = Test.liftWith(listS3), keyExists = keyExists)) - implicit val cache: Cache[Test] = TestInterpreter.stateCacheInterpreter - - val expected = List( - DataDiscovery( - dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"), - Some(2L), - Some(2L), - List( - ShreddedType.Json( - Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",2,Semver(0,11,0,None)), - s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json")), - ShreddedType.Json( - Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",1,Semver(0,11,0,None)), - s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json")) - ), - specificFolder = false - ) - ) - - val shreddedGood = S3.Folder.coerce("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/") - val discoveryTarget = DataDiscovery.Global(shreddedGood) - val (_, result) = DataDiscovery.discover[Test](discoveryTarget, Semver(0,11,0), "us-east-1", None).value.run(TestState.init).value - - result must beRight(expected) - } - - def e6 = { - def listS3(bucket: S3.Folder) = - List( - S3.Key.coerce(bucket + "atomic-events/part-0000"), - S3.Key.coerce(bucket + "atomic-events/part-0001"), - S3.Key.coerce(bucket + "com.mailchimp/email_address_change/jsonschema/1-0-0/part-00001"), - S3.Key.coerce(bucket + "com.mailchimp/email_address_change/jsonschema/1-0-0/part-00002"), - S3.Key.coerce(bucket + "com.mailchimp/email_address_change/jsonschema/2-0-0/part-00001") - ).map(k => S3.BlobObject(k, 1L)).asRight[LoaderError] - - def keyExists(k: S3.Key): Boolean = - k.toString match { - case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json" => true - case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json" => true - case _ => false - } - - implicit val control: Logging[Test] = TestInterpreter.stateControlInterpreter(ControlResults.init) - implicit val aws: AWS[Test] = TestInterpreter.stateAwsInterpreter(AWSResults.init.copy(listS3 = Test.liftWith(listS3), keyExists = keyExists)) - implicit val cache: Cache[Test] = TestInterpreter.stateCacheInterpreter - - val targetFolder = S3.Folder.coerce("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/") - - val expected = List( - DataDiscovery( - dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"), - Some(2L), - Some(2L), - List( - ShreddedType.Json( - Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",2,Semver(0,11,0,None)), - s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json")), - ShreddedType.Json( - Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",1,Semver(0,11,0,None)), - s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json")) - ), - specificFolder = true - ) - ) - - val discoveryTarget = DataDiscovery.InSpecificFolder(targetFolder) - val (_, result) = DataDiscovery.discover[Test](discoveryTarget, Semver(0,11,0), "us-east-1", None).value.run(TestState.init).value - - result must beRight(expected) - } - - def e7 = { - def listS3(bucket: S3.Folder) = - List( - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-0000"), - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-0001"), - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/com.mailchimp/email_address_change/jsonschema/1-0-0/part-00001"), - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/com.mailchimp/email_address_change/jsonschema/1-0-0/part-00002"), - S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/com.mailchimp/email_address_change/jsonschema/2-0-0/part-00001"), - // Another folder - S3.Key.coerce(bucket + "run=2018-10-12-10-20-00/atomic-events/part-0000"), - S3.Key.coerce(bucket + "run=2018-10-12-10-20-00/atomic-events/part-0001") - ).map(k => S3.BlobObject(k, 1L)).asRight[LoaderError] - - def keyExists(k: S3.Key): Boolean = - k.toString match { - case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json" => true - case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json" => true - case _ => false - } - - - implicit val control: Logging[Test] = TestInterpreter.stateControlInterpreter(ControlResults.init) - implicit val aws: AWS[Test] = TestInterpreter.stateAwsInterpreter(AWSResults.init.copy(listS3 = Test.liftWith(listS3), keyExists = keyExists)) - implicit val cache: Cache[Test] = TestInterpreter.stateCacheInterpreter - - val targetFolder = S3.Folder.coerce("s3://runfolder-test/shredded/good/") - - val expected = List( - DataDiscovery( - dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"), - Some(2L), - Some(2L), - List( - ShreddedType.Json( - Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",2,Semver(0,11,0,None)), - s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json")), - ShreddedType.Json( - Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",1,Semver(0,11,0,None)), - s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json")) - ), - specificFolder = true - ), - DataDiscovery( - dir("s3://runfolder-test/shredded/good/run=2018-10-12-10-20-00/"), - Some(2L), - Some(2L), - List(), - specificFolder = true - ) - ) - - val discoveryTarget = DataDiscovery.InSpecificFolder(targetFolder) - val (_, result) = DataDiscovery.discover[Test](discoveryTarget, Semver(0,11,0), "us-east-1", None).value.run(TestState.init).value - - result must beRight(expected) - } - - def e8 = { + def e3 = { def listS3(bucket: S3.Folder) = List( S3.BlobObject(S3.Key.join(bucket, "_SUCCESS"), 0L), @@ -301,7 +84,7 @@ class DataDiscoverySpec extends Specification { def is = s2""" result.map(_.length) must beRight(3) } - def e9 = { + def e4 = { val shreddedTypes = List( ShreddedType.Json(ShreddedType.Info(S3.Folder.coerce("s3://my-bucket/my-path"), "com.acme", "event", 2, Semver(1,5,0)), S3.Key.coerce("s3://assets/event_1.json")), ShreddedType.Json(ShreddedType.Info(S3.Folder.coerce("s3://my-bucket/my-path"), "com.acme", "context", 2, Semver(1,5,0)), S3.Key.coerce("s3://assets/context_1.json")) diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedTypeSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedTypeSpec.scala index 0919febb2..c58cead3e 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedTypeSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedTypeSpec.scala @@ -23,53 +23,29 @@ import com.snowplowanalytics.snowplow.rdbloader.config.Semver import com.snowplowanalytics.snowplow.rdbloader.discovery.ShreddedType._ class ShreddedTypeSpec extends Specification with ScalaCheck { def is = s2""" - Transform correct S3 path $e1 - Fail to transform path without valid vendor $e2 - Fail to transform path without file $e3 - Transform correct S3 path without base $e4 - Transform correct S3 path for Shred job > 0.12.0 format $e5 - Transform correct S3 path without root folder $e6 - Modern and legacy transformation always give same result $e7 - Transform full modern shredded key $e8 - extractedSchemaKey parsed a path for tabular output $e9 - Transform correct tabular S3 path $e10 + Fail to transform path without valid vendor $e1 + Fail to transform path without file $e2 + Transform correct S3 path for Shred job $e3 + Transform full modern shredded key $e4 + extractedSchemaKey parsed a path for tabular output $e5 + Transform correct tabular S3 path $e6 """ - import ShreddedTypeSpec._ - def e1 = { - val path = "cross-batch-test/shredded-archive/run=2017-04-27-14-39-42/com.snowplowanalytics.snowplow/submit_form/jsonschema/1-0-0/part-00000-00001" - val expectedPrefix = S3.Folder.coerce("s3://rdb-test/cross-batch-test/shredded-archive/run=2017-04-27-14-39-42") - val expected = (false, Info(expectedPrefix, "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,10,0))) - val key = S3.Key.coerce(s"s3://rdb-test/$path") - - val result = ShreddedType.transformPath(key, Semver(0,10,0)) - result must beRight(expected) - } - - def e2 = { val path = "cross-batch-test/shredded-archive/run%3D2017-04-27-14-39-42/submit_form/jsonschema/1-0-0/part-00000-00001" val key = S3.Key.coerce(s"s3://rdb-test/$path") val result = ShreddedType.transformPath(key, Semver(0,10,0)) result must beLeft } - def e3 = { + def e2 = { val path = "cross-batch-test/shredded-archive/run%3D2017-04-27-14-39-42/com.snowplowanalytics.snowplow/submit_form/jsonschema/1-0-0" val key = S3.Key.coerce(s"s3://rdb-test/$path") val result = ShreddedType.transformPath(key, Semver(0,12,0)) result must beLeft } - def e4 = { - val path = "com.snowplowanalytics.snowplow/submit_form/jsonschema/1-0-0/part-00000-00001" - val key = S3.Key.coerce(s"s3://rdb-test/$path") - val result = ShreddedType.transformPath(key, Semver(0,10,0)) - val expected = (false, Info(S3.Folder.coerce("s3://rdb-test"), "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,10,0))) - result must beRight(expected) - } - - def e5 = { + def e3 = { val path = "vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00000-00001" val key = S3.Key.coerce(s"s3://rdb-test/shredded-types/$path") val result = ShreddedType.transformPath(key, Semver(0,13,0)) @@ -77,39 +53,7 @@ class ShreddedTypeSpec extends Specification with ScalaCheck { def is = s2""" result must beRight(expected) } - def e6 = { - val path = "run%3D2017-04-27-14-39-42/com.snowplowanalytics.snowplow/submit_form/jsonschema/1-0-0/part-00000-00001" - val key = S3.Key.coerce(s"s3://rdb-test/$path") - - val expectedPrefix = S3.Folder.coerce("s3://rdb-test/run%3D2017-04-27-14-39-42") - val expected = (false, Info(expectedPrefix, "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,11,0))) - - val result = ShreddedType.transformPath(key, Semver(0,11,0)) - result must beRight(expected) - } - - def e7 = { - prop { (elements: ShreddedTypeElements) => elements match { - case (subpath, vendor, name, format, model, revision, addition) => - val legacy = s"s3://some-bucket/$subpath$vendor/$name/$format/$model-$revision-$addition/part-1" - val modern = s"s3://some-bucket/${subpath}shredded-types/vendor=$vendor/name=$name/format=$format/version=$model-$revision-$addition/part-2" - val legacyResult = ShreddedType.transformPath(S3.Key.coerce(legacy), Semver(0,11,0)) - val modernResult = ShreddedType.transformPath(S3.Key.coerce(modern), Semver(0,12,0)) - val eitherMatch = legacyResult.void.leftMap(_ => ()) must beEqualTo(modernResult.void.leftMap(_ => ())) - val valueMatch = (legacyResult, modernResult) match { - case (l: Right[_, _], m: Right[_, _]) => - val legacy = l.value._2.copy(shredJob = Semver(0,0,0)) // Erase Shred job versions - val modern = m.value._2.copy(shredJob = Semver(0,0,0)) - legacy must beEqualTo(modern) - case (Left(_), Left(_)) => ok - case _ => ko - } - eitherMatch.and(valueMatch) - - } }.setGen(shreddedTypeElementsGen) - } - - def e8 = { + def e4 = { val key = S3.Key.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00000-00001") val expectedPrefix = S3.Folder.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/") @@ -119,12 +63,12 @@ class ShreddedTypeSpec extends Specification with ScalaCheck { def is = s2""" result must beRight(expected) } - def e9 = { + def e5 = { val input = "shredded-tsv/vendor=com.snowplow/name=event/format=jsonschema/version=1" ShreddedType.extractSchemaKey(input, Semver(0,18,0)) must beSome(Extracted.Tabular("com.snowplow", "event", "jsonschema", 1)) } - def e10 = { + def e6 = { val key = S3.Key.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/shredded-tsv/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1/part-00000-00001") val expectedPrefix = S3.Folder.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/") @@ -177,7 +121,4 @@ object ShreddedTypeSpec { revision <- Gen.chooseNum(0, 10) addition <- Gen.chooseNum(0, 10) } yield (subpath, vendor, name, format, model, revision, addition) - - } -