Skip to content

Commit

Permalink
RDB Shredder: factor out scala-common-enrich (close #138)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Apr 17, 2019
1 parent 3b93c2e commit bd5b423
Show file tree
Hide file tree
Showing 27 changed files with 982 additions and 1,130 deletions.
14 changes: 11 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,23 @@ lazy val shredder = project.in(file("shredder"))
// Java
Dependencies.dynamodb,
// Scala
Dependencies.decline,
Dependencies.analyticsSdk,
Dependencies.eventsManifest,
Dependencies.circeJawn,
Dependencies.circeLiteral,
Dependencies.sparkCore,
Dependencies.sparkSQL,
Dependencies.scalaz7,
Dependencies.scopt,
Dependencies.commonEnrich,
Dependencies.igluClient,
Dependencies.igluCoreCirce,
Dependencies.manifest,
// Scala (test only)
Dependencies.specs2
),

dependencyOverrides ++= Seq(
Dependencies.dynamodb,
"com.fasterxml.jackson.core" % "jackson-core" % "2.6.7",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7.2"
)
)
2 changes: 1 addition & 1 deletion project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object BuildSettings {
*/
lazy val buildSettings = Seq(
organization := "com.snowplowanalytics",
scalaVersion := "2.11.11",
scalaVersion := "2.11.12",

scalacOptions ++= Seq(
"-deprecation",
Expand Down
30 changes: 15 additions & 15 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@ object Dependencies {

object V {
// Scala (Loader)
val decline = "0.6.2"
val scopt = "3.6.0"
val scalaz7 = "7.0.9"
val igluClient = "0.5.0"
val igluClient = "0.6.0-M5"
val igluCore = "0.2.0"
val scalaTracker = "0.5.0"
val circeYaml = "0.8.0"
val circe = "0.9.3"
val circe = "0.11.1"
val cats = "1.1.0"
val manifest = "0.1.0"
val fs2 = "0.10.5"
val manifest = "0.2.0-M1"
val fs2 = "1.0.4"

// Scala (Shredder)
val analyticsSdk = "0.4.1"
val spark = "2.2.0"
val commonEnrich = "0.32.0"
val eventsManifest = "0.2.0-M2"

// Java (Loader)
val postgres = "42.0.0"
Expand All @@ -44,17 +46,12 @@ object Dependencies {


val resolutionRepos = Seq(
// For specs2
"scalaz-bintray" at "http://dl.bintray.com/scalaz/releases",
// Redshift native driver
"redshift" at "http://redshift-maven-repository.s3-website-us-east-1.amazonaws.com/release",
// For Snowplow libs (SCE transient)
"Snowplow Analytics Maven repo" at "http://maven.snplow.com/releases/",
// For uaParser utils (SCE transient)
"user-agent-parser repo" at "https://clojars.org/repo/"
"redshift" at "http://redshift-maven-repository.s3-website-us-east-1.amazonaws.com/release"
)

// Scala (Loader)
val decline = "com.monovore" %% "decline" % V.decline
val scopt = "com.github.scopt" %% "scopt" % V.scopt
val scalaz7 = "org.scalaz" %% "scalaz-core" % V.scalaz7
val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % V.igluClient
Expand All @@ -70,9 +67,12 @@ object Dependencies {
val fs2 = "co.fs2" %% "fs2-core" % V.fs2

// Scala (Shredder)
val commonEnrich = "com.snowplowanalytics" %% "snowplow-common-enrich" % V.commonEnrich
val sparkCore = "org.apache.spark" %% "spark-core" % V.spark % "provided"
val sparkSQL = "org.apache.spark" %% "spark-sql" % V.spark % "provided"
val analyticsSdk = "com.snowplowanalytics" %% "snowplow-scala-analytics-sdk" % V.analyticsSdk
val eventsManifest = "com.snowplowanalytics" %% "snowplow-events-manifest" % V.eventsManifest
val circeJawn = "io.circe" %% "circe-jawn" % V.circe
val circeLiteral = "io.circe" %% "circe-literal" % V.circe
val sparkCore = "org.apache.spark" %% "spark-core" % V.spark % "provided"
val sparkSQL = "org.apache.spark" %% "spark-sql" % V.spark % "provided"

// Java (Loader)
val postgres = "org.postgresql" % "postgresql" % V.postgres
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.snowplowanalytics.snowplow.storage.spark

import io.circe.{Encoder, Json}
import io.circe.syntax._
import cats.data.NonEmptyList
import com.snowplowanalytics.iglu.client.ClientError
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event


sealed trait BadRow {
def toCompactJson: String
}

object BadRow {


case class ShreddingError(original: String, errors: NonEmptyList[String]) extends BadRow {
def toCompactJson: String =
Json.obj(
"original" := original.asJson,
"errors" := errors.asJson
).noSpaces
}

case class ValidationError(original: Event, errors: NonEmptyList[SchemaError]) extends BadRow {
def toCompactJson: String =
Json.obj(
"original" := original.asJson,
"errors" := errors.asJson
).noSpaces
}

case class DeduplicationError(original: Event, error: String) extends BadRow {
def toCompactJson: String = Json.obj(
"original" := original.asJson,
"error" := error.asJson
).noSpaces
}

case class SchemaError(schema: SchemaKey, error: ClientError)

implicit val schemaErrorCirceJsonEncoder: Encoder[SchemaError] =
Encoder.instance { case SchemaError(schema, error) =>
error.asJson.deepMerge(Json.obj("schema" := schema.toSchemaUri.asJson))
}
}
Loading

0 comments on commit bd5b423

Please sign in to comment.