Skip to content

Commit

Permalink
RDB Loader: add tabular output loading (close #152)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Jun 25, 2019
1 parent cfd7f61 commit 28d0a26
Show file tree
Hide file tree
Showing 30 changed files with 851 additions and 319 deletions.
23 changes: 23 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,25 @@
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/

lazy val common = project.in(file("common"))
.settings(Seq(
name := "snowplow-rdb-loader-common"
))
.settings(BuildSettings.buildSettings)
.settings(resolvers ++= Dependencies.resolutionRepos)
.settings(
libraryDependencies ++= Seq(
Dependencies.igluClient,
Dependencies.igluCoreCirce,
Dependencies.scalaTracker,
Dependencies.scalaTrackerEmit,
Dependencies.circeGeneric,
Dependencies.circeGenericExtra,
Dependencies.circeLiteral,
Dependencies.schemaDdl
)
)

lazy val loader = project.in(file("."))
.settings(
name := "snowplow-rdb-loader",
Expand All @@ -26,6 +45,7 @@ lazy val loader = project.in(file("."))
libraryDependencies ++= Seq(
Dependencies.decline,
Dependencies.igluClient,
Dependencies.igluCore,
Dependencies.igluCoreCirce,
Dependencies.scalaTracker,
Dependencies.scalaTrackerEmit,
Expand All @@ -36,6 +56,7 @@ lazy val loader = project.in(file("."))
Dependencies.circeLiteral,
Dependencies.manifest,
Dependencies.fs2,
Dependencies.schemaDdl,

Dependencies.postgres,
Dependencies.redshift,
Expand All @@ -50,6 +71,7 @@ lazy val loader = project.in(file("."))
Dependencies.scalaCheck
)
)
.dependsOn(common)

lazy val shredder = project.in(file("shredder"))
.settings(
Expand Down Expand Up @@ -89,3 +111,4 @@ lazy val shredder = project.in(file("shredder"))
"com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7.2"
)
)
.dependsOn(common)
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2012-2019 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.common

import io.circe.Json

import cats.Monad
import cats.data.EitherT
import cats.syntax.either._
import cats.effect.Clock

import com.snowplowanalytics.iglu.core._
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.ClientError.ResolutionError

import com.snowplowanalytics.iglu.schemaddl.IgluSchema
import com.snowplowanalytics.iglu.schemaddl.migrations.Migration.OrderedSchemas
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._

object Flattening {
/**
* Error specific to shredding JSON instance into tabular format
* `SchemaList` is unavailable (in case no Iglu Server hosts this schemas)
* Particular schema could not be fetched, thus whole flattening algorithm cannot be built
*/
sealed trait FlatteningError
object FlatteningError {
case class SchemaListResolution(error: ResolutionError) extends FlatteningError
case class SchemaResolution(error: ResolutionError) extends FlatteningError
case class Parsing(error: String) extends FlatteningError
}

// Cache = Map[SchemaKey, OrderedSchemas]

def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], key: SchemaKey): EitherT[F, FlatteningError, OrderedSchemas] =
getOrdered(resolver, key.vendor, key.name, key.version.model)

def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], vendor: String, name: String, model: Int): EitherT[F, FlatteningError, OrderedSchemas] =
for {
schemaList <- EitherT[F, ResolutionError, SchemaList](resolver.listSchemas(vendor, name, Some(model))).leftMap(FlatteningError.SchemaListResolution)
ordered <- OrderedSchemas.fromSchemaList(schemaList, fetch(resolver))
} yield ordered

def fetch[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F])(key: SchemaKey): EitherT[F, FlatteningError, IgluSchema] =
for {
json <- EitherT(resolver.lookupSchema(key, 2)).leftMap(FlatteningError.SchemaResolution)
schema <- EitherT.fromEither(parseSchema(json))
} yield schema

/** Parse JSON into self-describing schema, or return `FlatteningError` */
private def parseSchema(json: Json): Either[FlatteningError, IgluSchema] =
for {
selfDescribing <- SelfDescribingSchema.parse(json).leftMap(code => FlatteningError.Parsing(s"Cannot parse ${json.noSpaces} payload as self-describing schema, ${code.code}"))
parsed <- Schema.parse(selfDescribing.schema).toRight(FlatteningError.Parsing(s"Cannot parse ${selfDescribing.self.schemaKey.toSchemaUri} payload as JSON Schema"))
} yield SelfDescribingSchema(selfDescribing.self, parsed)

}
1 change: 0 additions & 1 deletion project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ object BuildSettings {
"-unchecked",
"-Ywarn-unused-import",
"-Ywarn-nullary-unit",
"-Xfatal-warnings",
"-Xlint",
"-Yinline-warnings",
"-language:higherKinds",
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object Dependencies {
val scalaTracker = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.scalaTracker
val scalaTrackerEmit = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-id" % V.scalaTracker
val manifest = "com.snowplowanalytics" %% "snowplow-processing-manifest" % V.manifest
val igluCore = "com.snowplowanalytics" %% "iglu-core" % V.igluCore
val igluCoreCirce = "com.snowplowanalytics" %% "iglu-core-circe" % V.igluCore
val cats = "org.typelevel" %% "cats" % V.cats
val catsFree = "org.typelevel" %% "cats-free" % V.cats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,18 @@ import cats.syntax.show._
import cats.effect.Clock

import com.snowplowanalytics.iglu.core._
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.ClientError.ResolutionError

import com.snowplowanalytics.iglu.schemaddl.IgluSchema
import com.snowplowanalytics.iglu.schemaddl.migrations.FlatData
import com.snowplowanalytics.iglu.schemaddl.migrations.Migration.OrderedSchemas
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event

import com.snowplowanalytics.snowplow.rdbloader.common.Flattening.{ getOrdered, FlatteningError }

object EventUtils {
/**
* Ready the enriched event for database load by removing a few JSON fields and truncating field
Expand Down Expand Up @@ -84,18 +82,6 @@ object EventUtils {
List(schema.vendor, schema.name, schema.format, schema.version.asString,
rootId.toString, rootTstamp.formatted, "events", s"""["events","${schema.name}"]""", "events")

/**
* Error specific to shredding JSON instance into tabular format
* `SchemaList` is unavailable (in case no Iglu Server hosts this schemas)
* Particular schema could not be fetched, thus whole flattening algorithm cannot be built
*/
sealed trait FlatteningError
object FlatteningError {
case class SchemaListResolution(error: ResolutionError) extends FlatteningError
case class SchemaResolution(error: ResolutionError) extends FlatteningError
case class Parsing(error: String) extends FlatteningError
}

/**
* Transform a self-desribing entity into tabular format, using its known schemas to get a correct order of columns
* @param resolver Iglu resolver to get list of known schemas
Expand All @@ -105,27 +91,6 @@ object EventUtils {
def flatten[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], instance: SelfDescribingData[Json]): EitherT[F, FlatteningError, List[String]] =
getOrdered(resolver, instance.schema).map { ordered => FlatData.flatten(instance.data, ordered) }

// Cache = Map[SchemaKey, OrderedSchemas]

def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], key: SchemaKey) =
for {
schemaList <- EitherT[F, ResolutionError, SchemaList](resolver.listSchemas(key.vendor, key.name, Some(key.version.model))).leftMap(FlatteningError.SchemaListResolution)
ordered <- OrderedSchemas.fromSchemaList(schemaList, fetch(resolver))
} yield ordered

def fetch[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F])(key: SchemaKey): EitherT[F, FlatteningError, IgluSchema] =
for {
json <- EitherT(resolver.lookupSchema(key, 2)).leftMap(FlatteningError.SchemaResolution)
schema <- EitherT.fromEither(parseSchema(json))
} yield schema

/** Parse JSON into self-describing schema, or return `FlatteningError` */
private def parseSchema(json: Json): Either[FlatteningError, IgluSchema] =
for {
selfDescribing <- SelfDescribingSchema.parse(json).leftMap(code => FlatteningError.Parsing(s"Cannot parse ${json.noSpaces} payload as self-describing schema, ${code.code}"))
parsed <- Schema.parse(selfDescribing.schema).toRight(FlatteningError.Parsing(s"Cannot parse ${selfDescribing.self.schemaKey.toSchemaUri} payload as JSON Schema"))
} yield SelfDescribingSchema(selfDescribing.self, parsed)

/** Get maximum length for a string value */
private def getLength(schema: Schema): Option[Int] =
schema.maxLength.map(_.value.toInt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import cats.implicits._

import com.snowplowanalytics.manifest.core.{ Item, Application }

import com.snowplowanalytics.iglu.schemaddl.migrations.Migration.OrderedSchemas

// This library
import Security.Tunnel
import loaders.Common.SqlString
Expand Down Expand Up @@ -71,6 +73,9 @@ object LoaderA {
// Security ops
case class GetEc2Property(name: String) extends LoaderA[Either[LoaderError, String]]

// Iglu ops
case class GetSchemas(vendor: String, name: String, model: Int) extends LoaderA[Either[LoaderError, OrderedSchemas]]


def listS3(bucket: S3.Folder): Action[Either[LoaderError, List[S3.BlobObject]]] =
Free.liftF[LoaderA, Either[LoaderError, List[S3.BlobObject]]](ListS3(bucket))
Expand Down Expand Up @@ -171,5 +176,10 @@ object LoaderA {
/** Retrieve decrypted property from EC2 Parameter Store */
def getEc2Property(name: String): Action[Either[LoaderError, String]] =
Free.liftF[LoaderA, Either[LoaderError, String]](GetEc2Property(name))


/** Retrieve list of schemas from Iglu Server */
def getSchemas(vendor: String, name: String, model: Int): Action[Either[LoaderError, OrderedSchemas]] =
Free.liftF[LoaderA, Either[LoaderError, OrderedSchemas]](GetSchemas(vendor, name, model))
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import cats.implicits._
import cats.data.ValidatedNel

import com.snowplowanalytics.manifest.core.ManifestError
import com.snowplowanalytics.manifest.core.ManifestError._

import com.snowplowanalytics.snowplow.rdbloader.discovery.DiscoveryFailure

/** Root error type */
sealed trait LoaderError
Expand All @@ -31,6 +32,7 @@ object LoaderError {
case l: StorageTargetError => "Data loading error " + l.message
case l: LoaderLocalError => "Internal Exception " + l.message
case m: LoadManifestError => "Load Manifest: " + m.message
case m: MigrationError => s"Table migration error. Please check the table consistency. ${m.message}"
}
}

Expand Down Expand Up @@ -58,110 +60,22 @@ object LoaderError {
/** `atomic.manifest` prevents this folder to be loaded */
case class LoadManifestError(message: String) extends LoaderError

/**
* Discovery failure. Represents failure of single step.
* Multiple failures can be aggregated into `DiscoveryError`,
* which is top-level `LoaderError`
*/
sealed trait DiscoveryFailure {
def getMessage: String
}

/**
* Cannot find JSONPaths file
*/
case class JsonpathDiscoveryFailure(jsonpathFile: String) extends DiscoveryFailure {
def getMessage: String =
s"JSONPath file [$jsonpathFile] was not found"
}

/**
* Cannot find `atomic-events` folder on S3
*/
case class AtomicDiscoveryFailure(path: String) extends DiscoveryFailure {
def getMessage: String =
s"Folder with atomic-events was not found in [$path]"
}

/**
* Cannot download file from S3
*/
case class DownloadFailure(key: S3.Key, message: String) extends DiscoveryFailure {
def getMessage: String =
s"Cannot download S3 object [$key].\n$message"
}

/**
* General S3 Exception
*/
case class S3Failure(error: String) extends DiscoveryFailure {
def getMessage = error
}

/**
* Invalid path for S3 key
*/
case class ShreddedTypeKeyFailure(path: S3.Key) extends DiscoveryFailure {
def getMessage: String =
s"Cannot extract contexts or self-describing events from file [$path]. " +
s"Corrupted shredded/good state or unexpected Snowplow Shred job version"
}

/**
* No data, while it **must** be present. Happens only with passed `--folder`, because on
* global discovery folder can be empty e.g. due eventual consistency
* @param path path, where data supposed to be found
*/
case class NoDataFailure(path: S3.Folder) extends DiscoveryFailure {
def getMessage: String =
s"No data discovered in [$path], while RDB Loader was explicitly pointed to it by '--folder' option. " +
s"Possible reasons: S3 eventual consistency or folder does not contain any files"

// Message for enabled manifest
def getManifestMessage: String =
s"Processing manifest does not have unprocessed item [$path]. It can be there, but " +
"already loaded by RDB Loader or unprocessed by RDB Shredder"
}

/**
* Cannot discovery shredded type in folder
*/
case class ShreddedTypeDiscoveryFailure(path: S3.Folder, invalidKeyCount: Int, example: S3.Key) extends DiscoveryFailure {
def getMessage: String =
s"Cannot extract contexts or self-describing events from directory [$path].\nInvalid key example: $example. Total $invalidKeyCount invalid keys.\nCorrupted shredded/good state or unexpected Snowplow Shred job version"
}

case class ManifestFailure(manifestError: ManifestError) extends DiscoveryFailure {
def getMessage: String = manifestError.show
override def toString: String = getMessage
}

/** Turn non-empty list of discovery failures into top-level `LoaderError` */
def flattenValidated[A](validated: ValidatedNel[DiscoveryFailure, A]): Either[LoaderError, A] =
validated.leftMap(errors => DiscoveryError(errors.toList): LoaderError).toEither

def fromManifestError(manifestError: ManifestError): LoaderError =
DiscoveryError(ManifestFailure(manifestError))

/** Other errors */
case class LoaderLocalError(message: String) extends LoaderError
DiscoveryFailure.ManifestFailure(manifestError).toLoaderError

/** Exception wrapper to pass to processing manifest */
case class LoaderThrowable(origin: LoaderError) extends Throwable {
override def getMessage: String = origin.show
}

/**
* Aggregate some failures into more compact error-list to not pollute end-error
*/
def aggregateDiscoveryFailures(failures: List[DiscoveryFailure]): List[DiscoveryFailure] = {
val (shreddedTypeFailures, otherFailures) = failures.span(_.isInstanceOf[ShreddedTypeKeyFailure])
val casted = shreddedTypeFailures.asInstanceOf[List[ShreddedTypeKeyFailure]]
val aggregatedByDir = casted.groupBy { failure =>
S3.Key.getParent(failure.path) }.map {
case (k, v) => ShreddedTypeDiscoveryFailure(k, v.length, v.head.path)
}.toList

aggregatedByDir ++ otherFailures
}
/** Other errors */
case class LoaderLocalError(message: String) extends LoaderError

/** Error happened during DDL-statements execution. Critical */
case class MigrationError(message: String) extends LoaderError

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package com.snowplowanalytics.snowplow.rdbloader

import cats.syntax.flatMap._
import cats.data.Validated._

// This project
Expand Down Expand Up @@ -52,7 +53,7 @@ object Main {
val interpreter = Interpreter.initialize(config)

val actions: Action[Int] = for {
data <- discover(config).value
data <- discover(config).flatTap(db.Migration.perform(config.target.schema)).value
result <- data match {
case Right(discovery) => load(config, discovery).value
case Left(LoaderError.StorageTargetError(message)) =>
Expand Down
Loading

0 comments on commit 28d0a26

Please sign in to comment.