Skip to content

Commit

Permalink
Common: use processing manifest (close #81)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Feb 23, 2018
1 parent 40766b0 commit 3b01288
Show file tree
Hide file tree
Showing 33 changed files with 953 additions and 142 deletions.
8 changes: 6 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012-2017 Snowplow Analytics Ltd. All rights reserved.
* Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
Expand All @@ -19,7 +19,7 @@ lazy val loader = project.in(file("."))
mainClass in Compile := Some("com.snowplowanalytics.snowplow.rdbloader.Main")
)
.settings(BuildSettings.buildSettings)
.settings(BuildSettings.scalifySettings)
.settings(BuildSettings.scalifySettings(name in shredder, version in shredder))
.settings(BuildSettings.assemblySettings)
.settings(resolvers ++= Dependencies.resolutionRepos)
.settings(
Expand All @@ -34,12 +34,14 @@ lazy val loader = project.in(file("."))
Dependencies.circeYaml,
Dependencies.circeGeneric,
Dependencies.circeGenericExtra,
Dependencies.manifest,

Dependencies.postgres,
Dependencies.redshift,
Dependencies.redshiftSdk,
Dependencies.s3,
Dependencies.ssm,
Dependencies.dynamodb,
Dependencies.jSch,

Dependencies.specs2,
Expand All @@ -58,6 +60,7 @@ lazy val shredder = project.in(file("shredder"))
.settings(BuildSettings.buildSettings)
.settings(resolvers ++= Dependencies.resolutionRepos)
.settings(BuildSettings.shredderAssemblySettings)
.settings(BuildSettings.scalifySettings(name, version))
.settings(
libraryDependencies ++= Seq(
// Java
Expand All @@ -70,6 +73,7 @@ lazy val shredder = project.in(file("shredder"))
Dependencies.scopt,
Dependencies.commonEnrich,
Dependencies.igluClient,
Dependencies.manifest,
// Scala (test only)
Dependencies.specs2
)
Expand Down
16 changes: 10 additions & 6 deletions project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object BuildSettings {
"-target", "1.8"
),

addCompilerPlugin("org.spire-math" % "kind-projector" % "0.9.4" cross CrossVersion.binary)
addCompilerPlugin("org.spire-math" % "kind-projector" % "0.9.6" cross CrossVersion.binary)
)

// sbt-assembly settings
Expand Down Expand Up @@ -102,8 +102,8 @@ object BuildSettings {
case x if x.startsWith("META-INF") => MergeStrategy.discard
case x if x.endsWith(".html") => MergeStrategy.discard
case x if x.endsWith("package-info.class") => MergeStrategy.first
case PathList("com", "google", "common", tail@_*) => MergeStrategy.first
case PathList("org", "apache", "spark", "unused", tail@_*) => MergeStrategy.first
case PathList("com", "google", "common", _) => MergeStrategy.first
case PathList("org", "apache", "spark", "unused", _) => MergeStrategy.first
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
Expand All @@ -113,17 +113,21 @@ object BuildSettings {
/**
* Makes package (build) metadata available withing source code
*/
lazy val scalifySettings = Seq(
def scalifySettings(shredderName: SettingKey[String], shredderVersion: SettingKey[String]) = Seq(
sourceGenerators in Compile += Def.task {
val file = (sourceManaged in Compile).value / "settings.scala"
IO.write(file, """package com.snowplowanalytics.snowplow.rdbloader.generated
|object ProjectMetadata {
| val version = "%s"
| val name = "%s"
| val name = "%s" // DO NOT EDIT! Processing Manifest depends on it
| val organization = "%s"
| val scalaVersion = "%s"
|
| val shredderName = "%s" // DO NOT EDIT! Processing Manifest depends on it
| val shredderVersion = "%s"
|}
|""".stripMargin.format(version.value, name.value, organization.value, scalaVersion.value))
|""".stripMargin.format(
version.value,name.value, organization.value, scalaVersion.value, shredderName.value, shredderVersion.value))
Seq(file)
}.taskValue
)
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ object Dependencies {
val circeYaml = "0.7.0"
val circe = "0.9.0"
val cats = "1.0.1"
val manifest = "0.1.0-SNAPSHOT"

// Scala (Shredder)
val spark = "2.2.0"
Expand Down Expand Up @@ -62,6 +63,7 @@ object Dependencies {
val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % V.igluClient
val igluCore = "com.snowplowanalytics" %% "iglu-core" % V.igluCore intransitive()
val scalaTracker = "com.snowplowanalytics" %% "snowplow-scala-tracker" % V.scalaTracker
val manifest = "com.snowplowanalytics" %% "processing-manifest" % V.manifest
val cats = "org.typelevel" %% "cats" % V.cats
val catsFree = "org.typelevel" %% "cats-free" % V.cats
val circeCore = "io.circe" %% "circe-core" % V.circe
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/
package com.snowplowanalytics.snowplow.storage.spark

import cats.implicits._

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder

import com.snowplowanalytics.manifest.core.ProcessingManifest._
import com.snowplowanalytics.manifest.dynamodb.DynamoDbManifest
import com.snowplowanalytics.snowplow.rdbloader.generated.ProjectMetadata

object DynamodbManifest {

type ManifestFailure[A] = Either[ManifestError, A]

val ShredJobApplication = Application(ProjectMetadata.name, ProjectMetadata.version, None)

val ShreddedTypesKeys = "processed:shredder:types"

def initialize(tableName: String) = {
val client = AmazonDynamoDBClientBuilder.standard().build()
DynamoDbManifest[ManifestFailure](client, tableName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package storage.spark
import java.io.{PrintWriter, StringWriter}
import java.util.UUID

import scala.util.Try
import scala.util.control.NonFatal

// Jackson
Expand All @@ -37,6 +38,10 @@ import Scalaz._
// AWS SDK
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException

// Manifest
import com.snowplowanalytics.manifest.core.ProcessingManifest
import com.snowplowanalytics.manifest.core.ProcessingManifest._

// Snowplow
import iglu.client.{JsonSchemaPair, ProcessingMessageNel, Resolver, SchemaKey}
import iglu.client.validation.ProcessingMessageMethods._
Expand Down Expand Up @@ -69,17 +74,47 @@ object ShredJob extends SparkJob {
classOf[org.apache.spark.sql.execution.datasources.FileFormatWriter$WriteTaskResult]
)
override def sparkConfig(): SparkConf = new SparkConf()
.setAppName(getClass().getSimpleName())
.setAppName(getClass.getSimpleName)
.setIfMissing("spark.master", "local[*]")
.set("spark.serializer", classOf[KryoSerializer].getName())
.set("spark.serializer", classOf[KryoSerializer].getName)
.registerKryoClasses(classesToRegister)

override def run(spark: SparkSession, args: Array[String]): Unit = {
val job = ShredJob(spark, args)
job.run()
// Job configuration
val shredConfig = ShredJobConfig
.loadConfigFrom(args)
.valueOr(e => throw new FatalEtlError(e.toString))

val job = new ShredJob(spark, shredConfig)

// Processing manifest, existing only on a driver
val manifest = shredConfig.dynamodbManifestTable.map(DynamodbManifest.initialize)
runJob(manifest, shredConfig.inFolder, job)
}

def apply(spark: SparkSession, args: Array[String]) = new ShredJob(spark, args)
/** Start a job, if necessary recording process to manifest */
def runJob(manifest: Option[ProcessingManifest[Either[ManifestError, ?]]],
path: String,
job: ShredJob): Try[Unit] = {
manifest match {
case None => // Manifest is not enabled, simply run a job
Try(job.run()).map(_ => None)
case Some(m) => // Manifest is enabled.
// Envelope job into lazy function to pass to `Manifest.processItem`
val process = () => Try {
job.run()
val shreddedTypes = job.shreddedTypes.value.toSet
val payload: Payload = Payload.empty.copy(set = Map(DynamodbManifest.ShreddedTypesKeys -> shreddedTypes))
Some(payload)
}

m.processNewItem(path, DynamodbManifest.ShredJobApplication, process) match {
case Right(_) => util.Success(())
case Left(ManifestError.ApplicationError(t, _, _)) => util.Failure(t)
case Left(error) => throw new FatalEtlError(error.toString)
}
}
}

/**
* Pipeline the loading of raw lines into shredded JSONs.
Expand Down Expand Up @@ -238,7 +273,7 @@ object ShredJob extends SparkJob {
*/
def getAlteredEnrichedOutputPath(outFolder: String): String = {
val alteredEnrichedEventSubdirectory = "atomic-events"
s"${outFolder}${if (outFolder.endsWith("/")) "" else "/"}${alteredEnrichedEventSubdirectory}"
s"$outFolder${if (outFolder.endsWith("/")) "" else "/"}$alteredEnrichedEventSubdirectory"
}

/**
Expand All @@ -248,7 +283,7 @@ object ShredJob extends SparkJob {
*/
def getShreddedTypesOutputPath(outFolder: String): String = {
val shreddedTypesSubdirectory = "shredded-types"
s"${outFolder}${if (outFolder.endsWith("/")) "" else "/"}${shreddedTypesSubdirectory}"
s"$outFolder${if (outFolder.endsWith("/")) "" else "/"}$shreddedTypesSubdirectory"
}

/**
Expand Down Expand Up @@ -308,16 +343,16 @@ case class Event(eventId: String, newEventId: Option[String], shredded: Shredded
/**
* The Snowplow Shred job, written in Spark.
* @param spark Spark session used throughout the job
* @param args Command line arguments for the shred job
* @param shredConfig parsed command-line arguments
*/
class ShredJob(@transient val spark: SparkSession, args: Array[String]) extends Serializable {
class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig) extends Serializable {
@transient private val sc: SparkContext = spark.sparkContext
import spark.implicits._
import singleton._

// Job configuration
private val shredConfig = ShredJobConfig.loadConfigFrom(args)
.valueOr(e => throw new FatalEtlError(e.toString))
// Accumulator to track shredded types
val shreddedTypes = new StringSetAccumulator
sc.register(shreddedTypes)

private val dupStorageConfig = DuplicateStorage.DynamoDbConfig.extract(
shredConfig.duplicateStorageConfig.success,
Expand All @@ -328,6 +363,18 @@ class ShredJob(@transient val spark: SparkSession, args: Array[String]) extends
// the table if it doesn't exist
@transient private val _ = DuplicateStorageSingleton.get(dupStorageConfig)

/** Save set of found shredded types into accumulator if processing manifest is enabled */
def recordShreddedType(jsonSchemaPairs: List[JsonSchemaPair]): Unit = {
if (shredConfig.duplicateStorageConfig.isEmpty) {
()
} else {
val typesSet = jsonSchemaPairs.toSet[JsonSchemaPair].map {
case (schemaKey, _) => schemaKey.toSchemaUri
}
shreddedTypes.add(typesSet)
}
}

/**
* Runs the shred job by:
* - shredding the Snowplow enriched events
Expand All @@ -354,13 +401,17 @@ class ShredJob(@transient val spark: SparkSession, args: Array[String]) extends
val good = common
.flatMap { case (line, shredded) => projectGoods(shredded).map((_, line)) }
.map { case (shred, line) => Shredded(shred._1, shred._2, shred._3, shred._4, line) }
.groupBy(s => (s.eventId, s.eventFingerprint))
.groupBy { s => (s.eventId, s.eventFingerprint) }
.flatMap { case (_, vs) => vs.take(1) }
.map { s =>
val absent = dedupeCrossBatch((s.eventId, s.eventFingerprint, s.etlTstamp),
DuplicateStorageSingleton.get(dupStorageConfig))
(s, absent)
}
.map { case original @ (Shredded(_, _, shreds, _, _), _) =>
recordShreddedType(shreds)
original
}
.cache()

// Deduplication operation succeeded
Expand All @@ -373,7 +424,7 @@ class ShredJob(@transient val spark: SparkSession, args: Array[String]) extends

// Count synthetic duplicates, defined as events with the same id but different fingerprints
val syntheticDupes = dupeSucceeded
.map(_._1)
.map { case (shredded, _) => shredded }
.groupBy(_.eventId)
.filter { case (_, vs) => vs.size > 1 }
.map { case (k, vs) => (k, vs.size.toLong) }
Expand All @@ -398,6 +449,7 @@ class ShredJob(@transient val spark: SparkSession, args: Array[String]) extends
case Failure(m) => Some(Row(BadRow(s.originalLine, m).toCompactJson))
case _ => None
} }

spark.createDataFrame(badDupes, StructType(StructField("_", StringType, true) :: Nil))
.write
.mode(SaveMode.Overwrite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ case class ShredJobConfig(
outFolder: String = "",
badFolder: String = "",
igluConfig: String = "",
duplicateStorageConfig: Option[String] = None
duplicateStorageConfig: Option[String] = None,
dynamodbManifestTable: Option[String] = None
)

object ShredJobConfig {
Expand All @@ -57,6 +58,9 @@ object ShredJobConfig {
opt[String]("duplicate-storage-config").optional().valueName("<duplicate storage config")
.action((d, c) => c.copy(duplicateStorageConfig = Some(d)))
.text("Duplicate storage configuration")
opt[String]("processing-manifest-table").optional().valueName("<dynamodb table name>")
.action((d, c) => c.copy(dynamodbManifestTable = Some(d)))
.text("Processing manifest table")
help("help").text("Prints this usage text")
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.storage.spark

import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable

import StringSetAccumulator._

class StringSetAccumulator extends AccumulatorV2[KeyAccum, KeyAccum] {

private val accum = mutable.Set.empty[String]

def merge(other: AccumulatorV2[KeyAccum, KeyAccum]): Unit = other match {
case o: StringSetAccumulator => accum ++= o.accum
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}

def isZero: Boolean = accum.isEmpty

def copy(): AccumulatorV2[KeyAccum, KeyAccum] = {
val newAcc = new StringSetAccumulator
accum.synchronized {
newAcc.accum ++= accum
}
newAcc
}

def value = accum

def add(keys: KeyAccum): Unit = {
accum ++= keys
}

def add(keys: Set[String]): Unit = {
val mutableSet = mutable.Set(keys.toList: _*)
add(mutableSet)
}

def reset(): Unit = {
accum.clear()
}
}

object StringSetAccumulator {
type KeyAccum = mutable.Set[String]
}
Loading

0 comments on commit 3b01288

Please sign in to comment.