Skip to content

Commit

Permalink
Common: get rid of atomic-events folder (close #183)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Jan 27, 2021
1 parent fdb32f3 commit 7bbf735
Show file tree
Hide file tree
Showing 41 changed files with 269 additions and 332 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,26 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.common

import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.core.{SchemaVer, SchemaKey}

import com.snowplowanalytics.iglu.client.resolver.registries.Registry

import com.snowplowanalytics.snowplow.rdbloader.common.Config.Formats
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{ShreddedType, Format}

/**
* Various common utility functions
*/
object Common {

val AtomicSchema: SchemaKey =
SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0))
val AtomicType = ShreddedType(AtomicSchema, Format.TSV)
val AtomicPath: String = entityPath(AtomicType)

def entityPath(entity: ShreddedType) =
s"vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}"

/**
* Remove all occurrences of access key id and secret access key from message
* Helps to avoid publishing credentials on insecure channels
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ object LoaderMessage {
SchemaKey("com.snowplowanalytics.snowplow.storage.rdbloader", "shredding_complete", "jsonschema", SchemaVer.Full(1,0,0))

/** Data format for shredded data */
sealed trait Format extends Product with Serializable
sealed trait Format extends Product with Serializable {
def path: String = this.toString.toLowerCase
}
object Format {
final case object TSV extends Format
final case object JSON extends Format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,6 @@ object S3 {

case class BlobObject(key: Key, size: Long)

/**
* Extract `s3://path/run=YYYY-MM-dd-HH-mm-ss/atomic-events/` part from
* `s3://path/run=YYYY-MM-dd-HH-mm-ss/atomic-events/somefile`
*
* @param s string probably containing run id and atomic events subpath
* @return string refined as folder
*/
def getAtomicPath(s: Key): Option[Folder] =
s match {
case AtomicSubpathPattern(prefix, subpath, _) =>
Some(Folder.coerce(prefix + "/" + subpath))
case _ => None
}

/**
* Refined type for AWS S3 key, allowing only valid S3 paths
* (with `s3://` prefix and without trailing shash)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ import com.snowplowanalytics.snowplow.scalatracker.UUIDProvider

package object common {

/** * Subpath to check `atomic-events` directory presence */
val AtomicSubpathPattern = "(.*)/(run=[0-9]{4}-[0-1][0-9]-[0-3][0-9]-[0-2][0-9]-[0-6][0-9]-[0-6][0-9]/atomic-events)/(.*)".r
// year month day hour minute second

implicit val catsClockIdInstance: Clock[Id] = new Clock[Id] {
override def realTime(unit: TimeUnit): Id[Long] =
unit.convert(System.currentTimeMillis(), MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import fs2.Stream
import com.snowplowanalytics.snowplow.rdbloader.dsl.{JDBC, Environment, Logging}
import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig
import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery
import com.snowplowanalytics.snowplow.rdbloader.loading.Common.load
import com.snowplowanalytics.snowplow.rdbloader.loading.Load.load
import com.snowplowanalytics.snowplow.rdbloader.utils.SSH

import io.sentry.Sentry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.snowplowanalytics.snowplow.rdbloader.{ LoaderAction, LoaderError, Act
import com.snowplowanalytics.snowplow.rdbloader.db.Entities.{Columns, TableState}
import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, DiscoveryFailure, ShreddedType}
import com.snowplowanalytics.snowplow.rdbloader.dsl.{Logging, Iglu, JDBC}
import com.snowplowanalytics.snowplow.rdbloader.loading.Common.SqlString
import com.snowplowanalytics.snowplow.rdbloader.loading.Load.SqlString

object Migration {
/**
Expand All @@ -36,7 +36,7 @@ object Migration {
* Do nothing in case there's only legacy JSON data
*/
def perform[F[_]: Monad: Logging: Iglu: JDBC](dbSchema: String, discovery: DataDiscovery): LoaderAction[F, Unit] =
discovery.shreddedTypes.traverse_ {
discovery.shreddedTypes.filterNot(_.isAtomic).traverse_ {
case ShreddedType.Tabular(ShreddedType.Info(_, vendor, name, model, _)) =>
for {
schemas <- EitherT(Iglu[F].getSchemas(vendor, name, model))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import cats.implicits._

import com.snowplowanalytics.snowplow.rdbloader.common.Config.Compression
import com.snowplowanalytics.snowplow.rdbloader.{DiscoveryStep, DiscoveryStream, LoaderError, LoaderAction, State}
import com.snowplowanalytics.snowplow.rdbloader.common.{S3, Message, Config, LoaderMessage}
import com.snowplowanalytics.snowplow.rdbloader.common.{Config, Message, LoaderMessage, S3, Common}
import com.snowplowanalytics.snowplow.rdbloader.dsl.{Logging, AWS, Cache}

/**
Expand All @@ -34,7 +34,7 @@ case class DataDiscovery(base: S3.Folder, shreddedTypes: List[ShreddedType], com

/** `atomic-events` directory full path */
def atomicEvents: S3.Folder =
S3.Folder.append(base, "atomic-events")
S3.Folder.append(base, Common.AtomicPath)

def show: String = {
val shreddedTypesList = shreddedTypes.map(x => s" * ${x.show}").mkString("\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import scala.util.matching.Regex
import cats.{Apply, Monad}
import cats.implicits._

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaVer, SchemaKey}
import com.snowplowanalytics.iglu.core.SchemaCriterion

import com.snowplowanalytics.snowplow.rdbloader.DiscoveryAction
import com.snowplowanalytics.snowplow.rdbloader.common.{S3, LoaderMessage, Semver}
import com.snowplowanalytics.snowplow.rdbloader.common.{S3, LoaderMessage, Semver, Common}
import com.snowplowanalytics.snowplow.rdbloader.dsl.{AWS, Cache}
import com.snowplowanalytics.snowplow.rdbloader.common.Common.toSnakeCase

Expand All @@ -31,6 +31,15 @@ sealed trait ShreddedType {
def getLoadPath: String
/** Human-readable form */
def show: String

/** Check if this type is special atomic type */
def isAtomic = this match {
case ShreddedType.Tabular(ShreddedType.Info(_, vendor, name, model, _)) =>
vendor == Common.AtomicSchema.vendor && name == Common.AtomicSchema.name && model == Common.AtomicSchema.version.model
case _ =>
false
}

}

/**
Expand All @@ -46,7 +55,7 @@ object ShreddedType {
*/
final case class Json(info: Info, jsonPaths: S3.Key) extends ShreddedType {
def getLoadPath: String =
s"${info.base}shredded-types/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}-"
s"${info.base}vendor=${info.vendor}/name=${info.name}/format=json/model=${info.model}"

def show: String = s"${info.toCriterion.asString} ($jsonPaths)"
}
Expand All @@ -59,7 +68,7 @@ object ShreddedType {
*/
final case class Tabular(info: Info) extends ShreddedType {
def getLoadPath: String =
s"${info.base}shredded-tsv/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}"
s"${info.base}vendor=${info.vendor}/name=${info.name}/format=tsv/model=${info.model}"

def show: String = s"${info.toCriterion.asString} TSV"
}
Expand All @@ -69,7 +78,7 @@ object ShreddedType {
* It cannot be counted as "final" shredded type,
* as it's not proven to have JSONPaths file
*
* @param base s3 path run folder (without `shredded-types` suffix)
* @param base s3 path run folder
* @param vendor self-describing type's vendor
* @param name self-describing type's name
* @param model self-describing type's SchemaVer model
Expand Down Expand Up @@ -113,24 +122,22 @@ object ShreddedType {

/** Regex to extract `SchemaKey` from `shredded/good` */
val ShreddedSubpathPattern: Regex =
("""shredded\-types""" +
"""/vendor=(?<vendor>[a-zA-Z0-9-_.]+)""" +
("""vendor=(?<vendor>[a-zA-Z0-9-_.]+)""" +
"""/name=(?<name>[a-zA-Z0-9-_]+)""" +
"""/format=(?<format>[a-zA-Z0-9-_]+)""" +
"""/version=(?<schemaver>[1-9][0-9]*(?:-(?:0|[1-9][0-9]*)){2})$""").r
"""/format=json""" +
"""/model=(?<model>[1-9][0-9]*)$""").r

/** Regex to extract `SchemaKey` from `shredded/good` */
val ShreddedSubpathPatternTabular: Regex =
("""shredded\-tsv""" +
"""/vendor=(?<vendor>[a-zA-Z0-9-_.]+)""" +
"""/name=(?<name>[a-zA-Z0-9-_]+)""" +
"""/format=(?<format>[a-zA-Z0-9-_]+)""" +
"""/version=(?<model>[1-9][0-9]*)$""").r
("""vendor=(?<vendor>[a-zA-Z0-9-_.]+)""" +
"""/name=(?<name>[a-zA-Z0-9-_]+)""" +
"""/format=tsv""" +
"""/model=(?<model>[1-9][0-9]*)$""").r

/**
* "shredded-types" + vendor + name + format + version + filename
* vendor + name + format + version + filename
*/
private val MinShreddedPathLengthModern = 6
private val MinShreddedPathLengthModern = 5

/**
* Check where JSONPaths file for particular shredded type exists:
Expand Down Expand Up @@ -210,29 +217,19 @@ object ShreddedType {
* @param shredJob version of shred job to decide what path format should be present
* @return either discovery failure or info (which in turn can be tabular (true) or JSON (false))
*/
def transformPath(key: S3.Key, shredJob: Semver): Either[DiscoveryFailure, (Boolean, Info)] = {
def transformPath(key: S3.Key, shredJob: Semver): Either[DiscoveryFailure, (LoaderMessage.Format, Info)] = {
val (bucket, path) = S3.splitS3Key(key)
val (subpath, shredpath) = splitFilpath(path)
val (subpath, shredpath) = splitFilepath(path)
extractSchemaKey(shredpath) match {
case Some(Extracted.Legacy(SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)))) =>
val prefix = S3.Folder.coerce("s3://" + bucket + "/" + subpath)
val result = Info(prefix, vendor, name, model, shredJob)
(false, result).asRight
case Some(Extracted.Tabular(vendor, name, _, model)) =>
case Some((vendor, name, model, format)) =>
val prefix = S3.Folder.coerce("s3://" + bucket + "/" + subpath)
val result = Info(prefix, vendor, name, model, shredJob)
(true, result).asRight
(format, result).asRight
case None =>
DiscoveryFailure.ShreddedTypeKeyFailure(key).asLeft
}
}

sealed trait Extracted
object Extracted {
final case class Legacy(key: SchemaKey) extends Extracted
final case class Tabular(vendor: String, name: String, format: String, model: Int) extends Extracted
}

/**
* Extract `SchemaKey` from subpath, which can be
* json-style (post-0.12.0) vendor=com.acme/name=schema-name/format=jsonschema/version=1-0-0
Expand All @@ -242,17 +239,20 @@ object ShreddedType {
* @param subpath S3 subpath of four `SchemaKey` elements
* @return valid schema key if found
*/
def extractSchemaKey(subpath: String): Option[Extracted] =
def extractSchemaKey(subpath: String): Option[(String, String, Int, LoaderMessage.Format)] =
subpath match {
case ShreddedSubpathPattern(vendor, name, format, version) =>
val uri = s"iglu:$vendor/$name/$format/$version"
SchemaKey.fromUri(uri).toOption.map(Extracted.Legacy)
case ShreddedSubpathPatternTabular(vendor, name, format, model) =>
case ShreddedSubpathPattern(vendor, name, model) =>
scala.util.Try(model.toInt).toOption match {
case Some(m) => Some((vendor, name, m, LoaderMessage.Format.JSON))
case None => None
}
case ShreddedSubpathPatternTabular(vendor, name, model) =>
scala.util.Try(model.toInt).toOption match {
case Some(m) => Extracted.Tabular(vendor, name, format, m).some
case Some(m) => Some((vendor, name, m, LoaderMessage.Format.TSV))
case None => None
}
case _ => None
case _ =>
None
}

/**
Expand All @@ -266,7 +266,7 @@ object ShreddedType {
* @param path S3 key without bucket name
* @return pair of subpath and shredpath
*/
private def splitFilpath(path: String): (String, String) =
private def splitFilepath(path: String): (String, String) =
path.split("/").reverse.splitAt(MinShreddedPathLengthModern) match {
case (reverseSchema, reversePath) =>
(reversePath.reverse.mkString("/"), reverseSchema.tail.reverse.mkString("/"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import com.snowplowanalytics.snowplow.rdbloader.{LoaderError, LoaderAction}
import com.snowplowanalytics.snowplow.rdbloader.LoaderError.StorageTargetError
import com.snowplowanalytics.snowplow.rdbloader.common.StorageTarget
import com.snowplowanalytics.snowplow.rdbloader.db.Decoder
import com.snowplowanalytics.snowplow.rdbloader.loading.Common.SqlString
import com.snowplowanalytics.snowplow.rdbloader.loading.Load.SqlString

import retry.{RetryPolicy, RetryPolicies, RetryDetails, retryingOnAllErrors}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import com.snowplowanalytics.snowplow.rdbloader.dsl.{Logging, JDBC, Iglu}


/** Entry-point for loading-related logic */
object Common {
object Load {

type MonadThrow[F[_]] = MonadError[F, Throwable]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.{S3, Config, Step}
import com.snowplowanalytics.snowplow.rdbloader.common.StorageTarget.Redshift
import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType}
import com.snowplowanalytics.snowplow.rdbloader.loading.RedshiftStatements._
import com.snowplowanalytics.snowplow.rdbloader.loading.Common.SqlString
import com.snowplowanalytics.snowplow.rdbloader.loading.Load.SqlString


/**
Expand Down Expand Up @@ -57,7 +57,10 @@ object RedshiftStatements {
* More than one `RedshiftLoadStatements` must be grouped with others using `buildQueue`
*/
private[loading] def getStatements(config: Config[Redshift], discovery: DataDiscovery): RedshiftStatements = {
val shreddedStatements = discovery.shreddedTypes.map(transformShreddedType(config, discovery.compression))
val shreddedStatements = discovery
.shreddedTypes
.filterNot(_.isAtomic)
.map(transformShreddedType(config, discovery.compression))
val transitCopy = config.steps.contains(Step.TransitCopy)
val compressionFormat = getCompressionFormat(discovery.compression)
val atomic = buildEventsCopy(config, discovery.atomicEvents, transitCopy, compressionFormat)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.rdbloader.common.Config.{Shredder, Compression}
import com.snowplowanalytics.snowplow.rdbloader.common.{S3, Config, LoaderMessage, StorageTarget}
import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig
import com.snowplowanalytics.snowplow.rdbloader.loading.Common.SqlString
import com.snowplowanalytics.snowplow.rdbloader.loading.Load.SqlString

object SpecHelpers {

Expand Down
Loading

0 comments on commit 7bbf735

Please sign in to comment.