Skip to content

Commit

Permalink
Common: get rid of atomic-events folder (close #183)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Jan 20, 2021
1 parent d1fe634 commit a3ddcc2
Show file tree
Hide file tree
Showing 33 changed files with 202 additions and 295 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,26 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.common

import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.core.{SchemaVer, SchemaKey}

import com.snowplowanalytics.iglu.client.resolver.registries.Registry

import com.snowplowanalytics.snowplow.rdbloader.common.Config.Formats
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{ShreddedType, Format}

/**
* Various common utility functions
*/
object Common {

val AtomicSchema: SchemaKey =
SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0))
val AtomicType = ShreddedType(AtomicSchema, Format.TSV)
val AtomicPath: String = entityPath(AtomicType)

def entityPath(entity: ShreddedType) =
s"vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}"

/**
* Remove all occurrences of access key id and secret access key from message
* Helps to avoid publishing credentials on insecure channels
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ object LoaderMessage {
SchemaKey("com.snowplowanalytics.snowplow.storage.rdbloader", "shredding_complete", "jsonschema", SchemaVer.Full(1,0,0))

/** Data format for shredded data */
sealed trait Format extends Product with Serializable
sealed trait Format extends Product with Serializable {
def path: String = this.toString.toLowerCase
}
object Format {
final case object TSV extends Format
final case object JSON extends Format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,6 @@ object S3 {

case class BlobObject(key: Key, size: Long)

/**
* Extract `s3://path/run=YYYY-MM-dd-HH-mm-ss/atomic-events/` part from
* `s3://path/run=YYYY-MM-dd-HH-mm-ss/atomic-events/somefile`
*
* @param s string probably containing run id and atomic events subpath
* @return string refined as folder
*/
def getAtomicPath(s: Key): Option[Folder] =
s match {
case AtomicSubpathPattern(prefix, subpath, _) =>
Some(Folder.coerce(prefix + "/" + subpath))
case _ => None
}

/**
* Refined type for AWS S3 key, allowing only valid S3 paths
* (with `s3://` prefix and without trailing shash)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ import com.snowplowanalytics.snowplow.scalatracker.UUIDProvider

package object common {

/** * Subpath to check `atomic-events` directory presence */
val AtomicSubpathPattern = "(.*)/(run=[0-9]{4}-[0-1][0-9]-[0-3][0-9]-[0-2][0-9]-[0-6][0-9]-[0-6][0-9]/atomic-events)/(.*)".r
// year month day hour minute second

implicit val catsClockIdInstance: Clock[Id] = new Clock[Id] {
override def realTime(unit: TimeUnit): Id[Long] =
unit.convert(System.currentTimeMillis(), MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import cats.implicits._

import com.snowplowanalytics.snowplow.rdbloader.common.Config.Compression
import com.snowplowanalytics.snowplow.rdbloader.{DiscoveryStep, DiscoveryStream, LoaderError, LoaderAction, State}
import com.snowplowanalytics.snowplow.rdbloader.common.{S3, Message, Config, LoaderMessage}
import com.snowplowanalytics.snowplow.rdbloader.common.{Config, Message, LoaderMessage, S3, Common}
import com.snowplowanalytics.snowplow.rdbloader.dsl.{Logging, AWS, Cache}

/**
Expand All @@ -34,7 +34,7 @@ case class DataDiscovery(base: S3.Folder, shreddedTypes: List[ShreddedType], com

/** `atomic-events` directory full path */
def atomicEvents: S3.Folder =
S3.Folder.append(base, "atomic-events")
S3.Folder.append(base, Common.AtomicPath)

def show: String = {
val shreddedTypesList = shreddedTypes.map(x => s" * ${x.show}").mkString("\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import scala.util.matching.Regex
import cats.{Apply, Monad}
import cats.implicits._

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaVer, SchemaKey}
import com.snowplowanalytics.iglu.core.SchemaCriterion

import com.snowplowanalytics.snowplow.rdbloader.DiscoveryAction
import com.snowplowanalytics.snowplow.rdbloader.common.{S3, LoaderMessage, Semver}
import com.snowplowanalytics.snowplow.rdbloader.common.{S3, LoaderMessage, Semver, Common}
import com.snowplowanalytics.snowplow.rdbloader.dsl.{AWS, Cache}
import com.snowplowanalytics.snowplow.rdbloader.common.Common.toSnakeCase

Expand All @@ -31,6 +31,15 @@ sealed trait ShreddedType {
def getLoadPath: String
/** Human-readable form */
def show: String

/** Check if this type is special atomic type */
def isAtomic = this match {
case ShreddedType.Tabular(ShreddedType.Info(_, vendor, name, model, _)) =>
vendor == Common.AtomicSchema.vendor && name == Common.AtomicSchema.name && model == Common.AtomicSchema.version.model
case _ =>
false
}

}

/**
Expand All @@ -46,7 +55,7 @@ object ShreddedType {
*/
final case class Json(info: Info, jsonPaths: S3.Key) extends ShreddedType {
def getLoadPath: String =
s"${info.base}shredded-types/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}-"
s"${info.base}vendor=${info.vendor}/name=${info.name}/format=json/model=${info.model}"

def show: String = s"${info.toCriterion.asString} ($jsonPaths)"
}
Expand All @@ -59,7 +68,7 @@ object ShreddedType {
*/
final case class Tabular(info: Info) extends ShreddedType {
def getLoadPath: String =
s"${info.base}shredded-tsv/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}"
s"${info.base}vendor=${info.vendor}/name=${info.name}/format=tsv/model=${info.model}"

def show: String = s"${info.toCriterion.asString} TSV"
}
Expand All @@ -69,7 +78,7 @@ object ShreddedType {
* It cannot be counted as "final" shredded type,
* as it's not proven to have JSONPaths file
*
* @param base s3 path run folder (without `shredded-types` suffix)
* @param base s3 path run folder
* @param vendor self-describing type's vendor
* @param name self-describing type's name
* @param model self-describing type's SchemaVer model
Expand Down Expand Up @@ -113,24 +122,22 @@ object ShreddedType {

/** Regex to extract `SchemaKey` from `shredded/good` */
val ShreddedSubpathPattern: Regex =
("""shredded\-types""" +
"""/vendor=(?<vendor>[a-zA-Z0-9-_.]+)""" +
("""vendor=(?<vendor>[a-zA-Z0-9-_.]+)""" +
"""/name=(?<name>[a-zA-Z0-9-_]+)""" +
"""/format=(?<format>[a-zA-Z0-9-_]+)""" +
"""/version=(?<schemaver>[1-9][0-9]*(?:-(?:0|[1-9][0-9]*)){2})$""").r
"""/format=json""" +
"""/model=(?<model>[1-9][0-9]*)$""").r

/** Regex to extract `SchemaKey` from `shredded/good` */
val ShreddedSubpathPatternTabular: Regex =
("""shredded\-tsv""" +
"""/vendor=(?<vendor>[a-zA-Z0-9-_.]+)""" +
"""/name=(?<name>[a-zA-Z0-9-_]+)""" +
"""/format=(?<format>[a-zA-Z0-9-_]+)""" +
"""/version=(?<model>[1-9][0-9]*)$""").r
("""vendor=(?<vendor>[a-zA-Z0-9-_.]+)""" +
"""/name=(?<name>[a-zA-Z0-9-_]+)""" +
"""/format=tsv""" +
"""/model=(?<model>[1-9][0-9]*)$""").r

/**
* "shredded-types" + vendor + name + format + version + filename
* vendor + name + format + version + filename
*/
private val MinShreddedPathLengthModern = 6
private val MinShreddedPathLengthModern = 5

/**
* Check where JSONPaths file for particular shredded type exists:
Expand Down Expand Up @@ -212,13 +219,13 @@ object ShreddedType {
*/
def transformPath(key: S3.Key, shredJob: Semver): Either[DiscoveryFailure, (Boolean, Info)] = {
val (bucket, path) = S3.splitS3Key(key)
val (subpath, shredpath) = splitFilpath(path)
val (subpath, shredpath) = splitFilepath(path)
extractSchemaKey(shredpath) match {
case Some(Extracted.Legacy(SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)))) =>
case Some(Extracted.Legacy(vendor, name, model)) =>
val prefix = S3.Folder.coerce("s3://" + bucket + "/" + subpath)
val result = Info(prefix, vendor, name, model, shredJob)
(false, result).asRight
case Some(Extracted.Tabular(vendor, name, _, model)) =>
case Some(Extracted.Tabular(vendor, name, model)) =>
val prefix = S3.Folder.coerce("s3://" + bucket + "/" + subpath)
val result = Info(prefix, vendor, name, model, shredJob)
(true, result).asRight
Expand All @@ -229,8 +236,8 @@ object ShreddedType {

sealed trait Extracted
object Extracted {
final case class Legacy(key: SchemaKey) extends Extracted
final case class Tabular(vendor: String, name: String, format: String, model: Int) extends Extracted
final case class Legacy(vendor: String, name: String, model: Int) extends Extracted
final case class Tabular(vendor: String, name: String, model: Int) extends Extracted
}

/**
Expand All @@ -244,15 +251,19 @@ object ShreddedType {
*/
def extractSchemaKey(subpath: String): Option[Extracted] =
subpath match {
case ShreddedSubpathPattern(vendor, name, format, version) =>
val uri = s"iglu:$vendor/$name/$format/$version"
SchemaKey.fromUri(uri).toOption.map(Extracted.Legacy)
case ShreddedSubpathPatternTabular(vendor, name, format, model) =>
case ShreddedSubpathPattern(vendor, name, model) =>
scala.util.Try(model.toInt).toOption match {
case Some(m) => Extracted.Legacy(vendor, name, m).some
case None => None
}
case ShreddedSubpathPatternTabular(vendor, name, model) =>
scala.util.Try(model.toInt).toOption match {
case Some(m) => Extracted.Tabular(vendor, name, format, m).some
case Some(m) => Extracted.Tabular(vendor, name, m).some
case None => None
}
case _ => None
case _ =>
println(subpath)
None
}

/**
Expand All @@ -266,7 +277,7 @@ object ShreddedType {
* @param path S3 key without bucket name
* @return pair of subpath and shredpath
*/
private def splitFilpath(path: String): (String, String) =
private def splitFilepath(path: String): (String, String) =
path.split("/").reverse.splitAt(MinShreddedPathLengthModern) match {
case (reverseSchema, reversePath) =>
(reversePath.reverse.mkString("/"), reverseSchema.tail.reverse.mkString("/"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ object RedshiftStatements {
* More than one `RedshiftLoadStatements` must be grouped with others using `buildQueue`
*/
private[loading] def getStatements(config: Config[Redshift], discovery: DataDiscovery): RedshiftStatements = {
val shreddedStatements = discovery.shreddedTypes.map(transformShreddedType(config, discovery.compression))
val shreddedStatements = discovery
.shreddedTypes
.filterNot(_.isAtomic)
.map(transformShreddedType(config, discovery.compression))
val transitCopy = config.steps.contains(Step.TransitCopy)
val compressionFormat = getCompressionFormat(discovery.compression)
val atomic = buildEventsCopy(config, discovery.atomicEvents, transitCopy, compressionFormat)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ class ShreddedTypeSpec extends Specification with ScalaCheck { def is = s2"""
}

def e3 = {
val path = "vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00000-00001"
val key = S3.Key.coerce(s"s3://rdb-test/shredded-types/$path")
val path = "vendor=com.snowplowanalytics.snowplow/name=submit_form/format=json/model=1/part-00000-00001"
val key = S3.Key.coerce(s"s3://rdb-test/$path")
val result = ShreddedType.transformPath(key, Semver(0,13,0))
val expected = (false, Info(S3.Folder.coerce("s3://rdb-test"), "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,13,0)))
result must beRight(expected)
}

def e4 = {
val key = S3.Key.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00000-00001")
val key = S3.Key.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=json/model=1/part-00000-00001")

val expectedPrefix = S3.Folder.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/")
val expected = (false, Info(expectedPrefix, "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,13,0)))
Expand All @@ -65,12 +65,12 @@ class ShreddedTypeSpec extends Specification with ScalaCheck { def is = s2"""
}

def e5 = {
val input = "shredded-tsv/vendor=com.snowplow/name=event/format=jsonschema/version=1"
ShreddedType.extractSchemaKey(input) must beSome(Extracted.Tabular("com.snowplow", "event", "jsonschema", 1))
val input = "vendor=com.snowplow/name=event/format=tsv/model=1"
ShreddedType.extractSchemaKey(input) must beSome(Extracted.Tabular("com.snowplow", "event", 1))
}

def e6 = {
val key = S3.Key.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/shredded-tsv/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1/part-00000-00001")
val key = S3.Key.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=tsv/model=1/part-00000-00001")

val expectedPrefix = S3.Folder.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/")
val expected = (true, Info(expectedPrefix, "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,16,0)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class CommonSpec extends Specification {

val expected = List(
"BEGIN",
"COPY atomic.events FROM 's3://shredded/base/atomic-events/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/shredded-types/vendor=com.acme/name=json-context/format=jsonschema/version=1-' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.events FROM 's3://shredded/base/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COMMIT"
)

Expand All @@ -59,8 +59,8 @@ class CommonSpec extends Specification {

val expected = List(
"BEGIN",
"COPY atomic.events FROM 's3://shredded/base/atomic-events/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/shredded-types/vendor=com.acme/name=json-context/format=jsonschema/version=1-' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.events FROM 's3://shredded/base/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"ACK",
"COMMIT"
)
Expand All @@ -80,8 +80,8 @@ class CommonSpec extends Specification {

val expected = List(
"BEGIN",
"COPY atomic.events FROM 's3://shredded/base/atomic-events/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/shredded-types/vendor=com.acme/name=json-context/format=jsonschema/version=1-' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.events FROM 's3://shredded/base/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
"COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP",
)

val result = Common.load[Pure](SpecHelpers.validCliConfig, message).runS
Expand Down
Loading

0 comments on commit a3ddcc2

Please sign in to comment.