Skip to content

Commit

Permalink
Loader: drop legacy S3 paths (close #198)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Oct 28, 2020
1 parent bf1c45d commit 43155a6
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 335 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ package com.snowplowanalytics.snowplow.rdbloader
package discovery

import scala.concurrent.duration._

import cats._
import cats.data._
import cats.implicits._
import cats.effect.Timer

import com.snowplowanalytics.snowplow.rdbloader.config.Semver
import com.snowplowanalytics.snowplow.rdbloader.LoaderError._
import com.snowplowanalytics.snowplow.rdbloader.dsl.{AWS, Cache, Logging}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,8 @@ object ShreddedType {
* @param jsonPaths existing JSONPaths file
*/
case class Json(info: Info, jsonPaths: S3.Key) extends ShreddedType {
def getLoadPath: String = {
if (info.shredJob <= ShredJobBeforeSparkVersion) {
s"${info.base}${info.vendor}/${info.name}/jsonschema/${info.model}-"
} else {
s"${info.base}shredded-types/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}-"
}
}
def getLoadPath: String =
s"${info.base}shredded-types/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}-"

def show: String = s"${info.toCriterion.asString} ($jsonPaths)"
}
Expand Down Expand Up @@ -108,16 +103,8 @@ object ShreddedType {
"""/format=(?<format>[a-zA-Z0-9-_]+)""" +
"""/version=(?<model>[1-9][0-9]*)$""").r

/** Version of legacy Shred job, where old path pattern was used `com.acme/event/jsonschema/1-0-0` */
val ShredJobBeforeSparkVersion = Semver(0,11,0)

/** Version of legacy Shred job, where TSV output was not possible */
val ShredJobBeforeTabularVersion = Semver(0,15,0) // TODO: is it

/**
* vendor + name + format + version + filename
*/
private val MinShreddedPathLengthLegacy = 5
val ShredJobBeforeTabularVersion = Semver(0,15,0)

/**
* "shredded-types" + vendor + name + format + version + filename
Expand Down Expand Up @@ -224,7 +211,7 @@ object ShreddedType {
*/
def transformPath(key: S3.Key, shredJob: Semver): Either[DiscoveryFailure, (Boolean, Info)] = {
val (bucket, path) = S3.splitS3Key(key)
val (subpath, shredpath) = splitFilpath(path, shredJob)
val (subpath, shredpath) = splitFilpath(path)
extractSchemaKey(shredpath, shredJob) match {
case Some(Extracted.Legacy(SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)))) =>
val prefix = S3.Folder.coerce("s3://" + bucket + "/" + subpath)
Expand All @@ -247,20 +234,16 @@ object ShreddedType {

/**
* Extract `SchemaKey` from subpath, which can be
* legacy-style (pre-0.12.0) com.acme/schema-name/jsonschema/1-0-0 or
* modern-style (post-0.12.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1-0-0
* tsv-style (port-0.16.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1
* json-style (post-0.12.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1-0-0
* tsv-style (post-0.16.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1
* This function transforms any of above valid paths to `SchemaKey`
*
* @param subpath S3 subpath of four `SchemaKey` elements
* @param shredJob shred job version to decide what format should be present
* @return valid schema key if found
*/
def extractSchemaKey(subpath: String, shredJob: Semver): Option[Extracted] = {
if (shredJob <= ShredJobBeforeSparkVersion) {
val uri = "iglu:" + subpath
SchemaKey.fromUri(uri).toOption.map(Extracted.Legacy)
} else subpath match {
def extractSchemaKey(subpath: String, shredJob: Semver): Option[Extracted] =
subpath match {
case ShreddedSubpathPattern(vendor, name, format, version) =>
val uri = s"iglu:$vendor/$name/$format/$version"
SchemaKey.fromUri(uri).toOption.map(Extracted.Legacy)
Expand All @@ -271,7 +254,6 @@ object ShreddedType {
}
case _ => None
}
}

/**
* Split S3 filepath (without bucket name) into subpath and shreddedpath
Expand All @@ -284,17 +266,9 @@ object ShreddedType {
* @param path S3 key without bucket name
* @return pair of subpath and shredpath
*/
private def splitFilpath(path: String, shredJob: Semver): (String, String) = {
if (shredJob <= ShredJobBeforeSparkVersion) {
path.split("/").reverse.splitAt(MinShreddedPathLengthLegacy) match {
case (reverseSchema, reversePath) =>
(reversePath.reverse.mkString("/"), reverseSchema.tail.reverse.mkString("/"))
}
} else {
path.split("/").reverse.splitAt(MinShreddedPathLengthModern) match {
case (reverseSchema, reversePath) =>
(reversePath.reverse.mkString("/"), reverseSchema.tail.reverse.mkString("/"))
}
private def splitFilpath(path: String): (String, String) =
path.split("/").reverse.splitAt(MinShreddedPathLengthModern) match {
case (reverseSchema, reversePath) =>
(reversePath.reverse.mkString("/"), reverseSchema.tail.reverse.mkString("/"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,95 +15,26 @@ package com.snowplowanalytics.snowplow.rdbloader.discovery
import java.util.UUID

import cats.syntax.either._
import org.specs2.Specification

import com.snowplowanalytics.snowplow.rdbloader.{LoaderError, TestInterpreter}
import com.snowplowanalytics.snowplow.rdbloader.utils.S3.Folder.{coerce => dir}
import com.snowplowanalytics.snowplow.rdbloader.utils.S3.Key.{coerce => s3key}
import com.snowplowanalytics.snowplow.rdbloader.TestInterpreter.{AWSResults, ControlResults, Test, TestState}
import com.snowplowanalytics.snowplow.rdbloader.config.Semver
import com.snowplowanalytics.snowplow.rdbloader.discovery.ShreddedType._
import com.snowplowanalytics.snowplow.rdbloader.dsl.{AWS, Cache, Logging}
import com.snowplowanalytics.snowplow.rdbloader.utils.S3

import org.specs2.Specification


class DataDiscoverySpec extends Specification { def is = s2"""
Successfully discover two run folders at once $e1
Fail to proceed with empty target folder $e3
Do not fail to proceed with empty shredded good folder $e4
Successfully discover data in run folder $e5
Successfully discover data in specific folder $e6
Successfully discover several folders using `InSpecificFolder` (decide if desired) $e7
listGoodBucket ignores special files $e8
show DataDiscovery with several shredded types $e9
Fail to proceed with empty target folder $e1
Do not fail to proceed with empty shredded good folder $e2
listGoodBucket ignores special files $e3
show DataDiscovery with several shredded types $e4
"""

val id = UUID.fromString("8ad6fc06-ae5c-4dfc-a14d-f2ae86755179")

def e1 = {
def listS3(bucket: S3.Folder) =
List(
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-0000"),
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-0001"),
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/com.mailchimp/email_address_change/jsonschema/1-0-0/part-00001"),
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/com.mailchimp/email_address_change/jsonschema/1-0-0/part-00002"),
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/com.mailchimp/email_address_change/jsonschema/2-0-0/part-00001"),

S3.Key.coerce(bucket + "run=2017-05-22-16-00-57/atomic-events/part-0000"),
S3.Key.coerce(bucket + "run=2017-05-22-16-00-57/atomic-events/part-0001"),
S3.Key.coerce(bucket + "run=2017-05-22-16-00-57/com.snowplowanalytics.snowplow/add_to_cart/jsonschema/1-0-0/part-00000"),
S3.Key.coerce(bucket + "run=2017-05-22-16-00-57/com.snowplowanalytics.snowplow/add_to_cart/jsonschema/1-0-0/part-00001")
).map(k => S3.BlobObject(k, 1L)).asRight[LoaderError]

def keyExists(k: S3.Key): Boolean = {
k.toString match {
case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json" => true
case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json" => true
case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/add_to_cart_1.json" => true
case _ => false
}
}

implicit val control: Logging[Test] = TestInterpreter.stateControlInterpreter(ControlResults.init)
implicit val aws: AWS[Test] = TestInterpreter.stateAwsInterpreter(AWSResults.init.copy(listS3 = Test.liftWith(listS3), keyExists = keyExists))
implicit val cache: Cache[Test] = TestInterpreter.stateCacheInterpreter

val shreddedGood = S3.Folder.coerce("s3://runfolder-test/shredded/good/")
val discoveryTarget = DataDiscovery.Global(shreddedGood)
val (_, result) = DataDiscovery.discover[Test](discoveryTarget, Semver(0,11,0), "us-east-1", None).value.run(TestState.init).value

val expected = List(
DataDiscovery(
dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),
Some(2L),
Some(2L),
List(
ShreddedType.Json(
Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",2,Semver(0,11,0,None)),
s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json")),
ShreddedType.Json(
Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",1,Semver(0,11,0,None)),
s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json"))
),
specificFolder = false
),

DataDiscovery(
dir("s3://runfolder-test/shredded/good/run=2017-05-22-16-00-57/"),
Some(2L),
Some(2L),
List(
ShreddedType.Json(
Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-16-00-57/"), "com.snowplowanalytics.snowplow","add_to_cart",1,Semver(0,11,0,None)),
s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.snowplowanalytics.snowplow/add_to_cart_1.json"))
),
specificFolder = false
)
)

result must beRight(expected)
}

def e3 = {

implicit val cache: Cache[Test] = TestInterpreter.stateCacheInterpreter
implicit val control: Logging[Test] = TestInterpreter.stateControlInterpreter(ControlResults.init)
Expand All @@ -119,7 +50,7 @@ class DataDiscoverySpec extends Specification { def is = s2"""
result must beLeft(expected)
}

def e4 = {
def e2 = {
implicit val cache: Cache[Test] = TestInterpreter.stateCacheInterpreter
implicit val control: Logging[Test] = TestInterpreter.stateControlInterpreter(ControlResults.init)
implicit val aws: AWS[Test] = TestInterpreter.stateAwsInterpreter(AWSResults.init)
Expand All @@ -135,155 +66,7 @@ class DataDiscoverySpec extends Specification { def is = s2"""
result must beRight(expected)
}

def e5 = {
def listS3(bucket: S3.Folder) =
List(
S3.Key.coerce(bucket + "atomic-events/part-0000"),
S3.Key.coerce(bucket + "atomic-events/part-0001"),
S3.Key.coerce(bucket + "com.mailchimp/email_address_change/jsonschema/1-0-0/part-00001"),
S3.Key.coerce(bucket + "com.mailchimp/email_address_change/jsonschema/1-0-0/part-00002"),
S3.Key.coerce(bucket + "com.mailchimp/email_address_change/jsonschema/2-0-0/part-00001")
).map(k => S3.BlobObject(k, 1L)).asRight[LoaderError]

def keyExists(k: S3.Key): Boolean =
k.toString match {
case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json" => true
case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json" => true
case _ => false
}

implicit val control: Logging[Test] = TestInterpreter.stateControlInterpreter(ControlResults.init)
implicit val aws: AWS[Test] = TestInterpreter.stateAwsInterpreter(AWSResults.init.copy(listS3 = Test.liftWith(listS3), keyExists = keyExists))
implicit val cache: Cache[Test] = TestInterpreter.stateCacheInterpreter

val expected = List(
DataDiscovery(
dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),
Some(2L),
Some(2L),
List(
ShreddedType.Json(
Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",2,Semver(0,11,0,None)),
s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json")),
ShreddedType.Json(
Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",1,Semver(0,11,0,None)),
s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json"))
),
specificFolder = false
)
)

val shreddedGood = S3.Folder.coerce("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/")
val discoveryTarget = DataDiscovery.Global(shreddedGood)
val (_, result) = DataDiscovery.discover[Test](discoveryTarget, Semver(0,11,0), "us-east-1", None).value.run(TestState.init).value

result must beRight(expected)
}

def e6 = {
def listS3(bucket: S3.Folder) =
List(
S3.Key.coerce(bucket + "atomic-events/part-0000"),
S3.Key.coerce(bucket + "atomic-events/part-0001"),
S3.Key.coerce(bucket + "com.mailchimp/email_address_change/jsonschema/1-0-0/part-00001"),
S3.Key.coerce(bucket + "com.mailchimp/email_address_change/jsonschema/1-0-0/part-00002"),
S3.Key.coerce(bucket + "com.mailchimp/email_address_change/jsonschema/2-0-0/part-00001")
).map(k => S3.BlobObject(k, 1L)).asRight[LoaderError]

def keyExists(k: S3.Key): Boolean =
k.toString match {
case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json" => true
case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json" => true
case _ => false
}

implicit val control: Logging[Test] = TestInterpreter.stateControlInterpreter(ControlResults.init)
implicit val aws: AWS[Test] = TestInterpreter.stateAwsInterpreter(AWSResults.init.copy(listS3 = Test.liftWith(listS3), keyExists = keyExists))
implicit val cache: Cache[Test] = TestInterpreter.stateCacheInterpreter

val targetFolder = S3.Folder.coerce("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/")

val expected = List(
DataDiscovery(
dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),
Some(2L),
Some(2L),
List(
ShreddedType.Json(
Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",2,Semver(0,11,0,None)),
s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json")),
ShreddedType.Json(
Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",1,Semver(0,11,0,None)),
s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json"))
),
specificFolder = true
)
)

val discoveryTarget = DataDiscovery.InSpecificFolder(targetFolder)
val (_, result) = DataDiscovery.discover[Test](discoveryTarget, Semver(0,11,0), "us-east-1", None).value.run(TestState.init).value

result must beRight(expected)
}

def e7 = {
def listS3(bucket: S3.Folder) =
List(
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-0000"),
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/atomic-events/part-0001"),
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/com.mailchimp/email_address_change/jsonschema/1-0-0/part-00001"),
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/com.mailchimp/email_address_change/jsonschema/1-0-0/part-00002"),
S3.Key.coerce(bucket + "run=2017-05-22-12-20-57/com.mailchimp/email_address_change/jsonschema/2-0-0/part-00001"),
// Another folder
S3.Key.coerce(bucket + "run=2018-10-12-10-20-00/atomic-events/part-0000"),
S3.Key.coerce(bucket + "run=2018-10-12-10-20-00/atomic-events/part-0001")
).map(k => S3.BlobObject(k, 1L)).asRight[LoaderError]

def keyExists(k: S3.Key): Boolean =
k.toString match {
case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json" => true
case "s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json" => true
case _ => false
}


implicit val control: Logging[Test] = TestInterpreter.stateControlInterpreter(ControlResults.init)
implicit val aws: AWS[Test] = TestInterpreter.stateAwsInterpreter(AWSResults.init.copy(listS3 = Test.liftWith(listS3), keyExists = keyExists))
implicit val cache: Cache[Test] = TestInterpreter.stateCacheInterpreter

val targetFolder = S3.Folder.coerce("s3://runfolder-test/shredded/good/")

val expected = List(
DataDiscovery(
dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),
Some(2L),
Some(2L),
List(
ShreddedType.Json(
Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",2,Semver(0,11,0,None)),
s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_2.json")),
ShreddedType.Json(
Info(dir("s3://runfolder-test/shredded/good/run=2017-05-22-12-20-57/"),"com.mailchimp","email_address_change",1,Semver(0,11,0,None)),
s3key("s3://snowplow-hosted-assets-us-east-1/4-storage/redshift-storage/jsonpaths/com.mailchimp/email_address_change_1.json"))
),
specificFolder = true
),
DataDiscovery(
dir("s3://runfolder-test/shredded/good/run=2018-10-12-10-20-00/"),
Some(2L),
Some(2L),
List(),
specificFolder = true
)
)

val discoveryTarget = DataDiscovery.InSpecificFolder(targetFolder)
val (_, result) = DataDiscovery.discover[Test](discoveryTarget, Semver(0,11,0), "us-east-1", None).value.run(TestState.init).value

result must beRight(expected)
}

def e8 = {
def e3 = {
def listS3(bucket: S3.Folder) =
List(
S3.BlobObject(S3.Key.join(bucket, "_SUCCESS"), 0L),
Expand All @@ -301,7 +84,7 @@ class DataDiscoverySpec extends Specification { def is = s2"""
result.map(_.length) must beRight(3)
}

def e9 = {
def e4 = {
val shreddedTypes = List(
ShreddedType.Json(ShreddedType.Info(S3.Folder.coerce("s3://my-bucket/my-path"), "com.acme", "event", 2, Semver(1,5,0)), S3.Key.coerce("s3://assets/event_1.json")),
ShreddedType.Json(ShreddedType.Info(S3.Folder.coerce("s3://my-bucket/my-path"), "com.acme", "context", 2, Semver(1,5,0)), S3.Key.coerce("s3://assets/context_1.json"))
Expand Down
Loading

0 comments on commit 43155a6

Please sign in to comment.