Skip to content

Commit

Permalink
RDB Loader: drop legacy S3 paths (close #198)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Nov 30, 2020
1 parent f94a9b2 commit bef1cde
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 340 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ package com.snowplowanalytics.snowplow.rdbloader
package discovery

import scala.concurrent.duration._

import cats._
import cats.data._
import cats.implicits._
import cats.effect.Timer

import com.snowplowanalytics.snowplow.rdbloader.config.Semver
import com.snowplowanalytics.snowplow.rdbloader.LoaderError._
import com.snowplowanalytics.snowplow.rdbloader.dsl.{AWS, Cache, Logging}
Expand Down Expand Up @@ -411,11 +413,6 @@ object DataDiscovery {

/** Shredded key that doesn't need a JSONPath file and can be mapped to final */
private case class ShreddedDataKeyTabular(key: S3.Key, info: ShreddedType.Info) extends DataKeyIntermediate {
def base: S3.Folder = {
val atomicEvents = S3.Key.getParent(key)
S3.Folder.getParent(atomicEvents)
}

def toFinal: ShreddedDataKeyFinal =
ShreddedDataKeyFinal(key, ShreddedType.Tabular(info))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,8 @@ object ShreddedType {
* @param jsonPaths existing JSONPaths file
*/
case class Json(info: Info, jsonPaths: S3.Key) extends ShreddedType {
def getLoadPath: String = {
if (info.shredJob <= ShredJobBeforeSparkVersion) {
s"${info.base}${info.vendor}/${info.name}/jsonschema/${info.model}-"
} else {
s"${info.base}shredded-types/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}-"
}
}
def getLoadPath: String =
s"${info.base}shredded-types/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}-"

def show: String = s"${info.toCriterion.asString} ($jsonPaths)"
}
Expand Down Expand Up @@ -108,16 +103,8 @@ object ShreddedType {
"""/format=(?<format>[a-zA-Z0-9-_]+)""" +
"""/version=(?<model>[1-9][0-9]*)$""").r

/** Version of legacy Shred job, where old path pattern was used `com.acme/event/jsonschema/1-0-0` */
val ShredJobBeforeSparkVersion = Semver(0,11,0)

/** Version of legacy Shred job, where TSV output was not possible */
val ShredJobBeforeTabularVersion = Semver(0,15,0) // TODO: is it

/**
* vendor + name + format + version + filename
*/
private val MinShreddedPathLengthLegacy = 5
val ShredJobBeforeTabularVersion = Semver(0,15,0)

/**
* "shredded-types" + vendor + name + format + version + filename
Expand Down Expand Up @@ -224,7 +211,7 @@ object ShreddedType {
*/
def transformPath(key: S3.Key, shredJob: Semver): Either[DiscoveryFailure, (Boolean, Info)] = {
val (bucket, path) = S3.splitS3Key(key)
val (subpath, shredpath) = splitFilpath(path, shredJob)
val (subpath, shredpath) = splitFilpath(path)
extractSchemaKey(shredpath, shredJob) match {
case Some(Extracted.Legacy(SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)))) =>
val prefix = S3.Folder.coerce("s3://" + bucket + "/" + subpath)
Expand All @@ -247,20 +234,16 @@ object ShreddedType {

/**
* Extract `SchemaKey` from subpath, which can be
* legacy-style (pre-0.12.0) com.acme/schema-name/jsonschema/1-0-0 or
* modern-style (post-0.12.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1-0-0
* tsv-style (port-0.16.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1
* json-style (post-0.12.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1-0-0
* tsv-style (post-0.16.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1
* This function transforms any of above valid paths to `SchemaKey`
*
* @param subpath S3 subpath of four `SchemaKey` elements
* @param shredJob shred job version to decide what format should be present
* @return valid schema key if found
*/
def extractSchemaKey(subpath: String, shredJob: Semver): Option[Extracted] = {
if (shredJob <= ShredJobBeforeSparkVersion) {
val uri = "iglu:" + subpath
SchemaKey.fromUri(uri).toOption.map(Extracted.Legacy)
} else subpath match {
def extractSchemaKey(subpath: String, shredJob: Semver): Option[Extracted] =
subpath match {
case ShreddedSubpathPattern(vendor, name, format, version) =>
val uri = s"iglu:$vendor/$name/$format/$version"
SchemaKey.fromUri(uri).toOption.map(Extracted.Legacy)
Expand All @@ -271,7 +254,6 @@ object ShreddedType {
}
case _ => None
}
}

/**
* Split S3 filepath (without bucket name) into subpath and shreddedpath
Expand All @@ -284,17 +266,9 @@ object ShreddedType {
* @param path S3 key without bucket name
* @return pair of subpath and shredpath
*/
private def splitFilpath(path: String, shredJob: Semver): (String, String) = {
if (shredJob <= ShredJobBeforeSparkVersion) {
path.split("/").reverse.splitAt(MinShreddedPathLengthLegacy) match {
case (reverseSchema, reversePath) =>
(reversePath.reverse.mkString("/"), reverseSchema.tail.reverse.mkString("/"))
}
} else {
path.split("/").reverse.splitAt(MinShreddedPathLengthModern) match {
case (reverseSchema, reversePath) =>
(reversePath.reverse.mkString("/"), reverseSchema.tail.reverse.mkString("/"))
}
private def splitFilpath(path: String): (String, String) =
path.split("/").reverse.splitAt(MinShreddedPathLengthModern) match {
case (reverseSchema, reversePath) =>
(reversePath.reverse.mkString("/"), reverseSchema.tail.reverse.mkString("/"))
}
}
}
Loading

0 comments on commit bef1cde

Please sign in to comment.