Skip to content

Commit

Permalink
RDB Shredder: persist synthetic duplicates on disk (close #142)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Apr 17, 2019
1 parent bd5b423 commit 4408f7c
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 18 deletions.
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ object Dependencies {
val decline = "0.6.2"
val scopt = "3.6.0"
val scalaz7 = "7.0.9"
val igluClient = "0.6.0-M5"
val igluCore = "0.2.0"
val igluClient = "0.6.0-M6"
val igluCore = "0.5.0-M1"
val scalaTracker = "0.5.0"
val circeYaml = "0.8.0"
val circe = "0.11.1"
Expand All @@ -29,7 +29,7 @@ object Dependencies {
val fs2 = "1.0.4"

// Scala (Shredder)
val analyticsSdk = "0.4.1"
val analyticsSdk = "0.4.2-M1"
val spark = "2.2.0"
val eventsManifest = "0.2.0-M2"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.registries.{Registry, RegistryError, RegistryLookup}
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData, SchemaList}

import com.snowplowanalytics.manifest.ItemId
import com.snowplowanalytics.manifest.core.{Application, ManifestError}
Expand All @@ -46,6 +46,8 @@ object DynamodbManifest {
implicit val manifestFailureLookup: RegistryLookup[ManifestFailure] = new RegistryLookup[ManifestFailure] {
def lookup(repositoryRef: Registry, schemaKey: SchemaKey): ManifestFailure[Either[RegistryError, Json]] =
RegistryLookup[Id].lookup(repositoryRef, schemaKey).asRight
def list(registry: Registry, vendor: String, name: String): ManifestFailure[Option[SchemaList]] =
RegistryLookup[Id].list(registry, vendor, name).asRight
}

case class ShredderManifest(manifest: DynamoDbManifest[ManifestFailure], itemId: ItemId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import cats.instances.list._
import cats.syntax.show._
import cats.syntax.either._
import cats.syntax.foldable._

import io.circe.Json
import io.circe.literal._

import java.util.UUID
import java.time.Instant
import java.time.format.DateTimeParseException
Expand All @@ -35,6 +37,7 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.storage.StorageLevel

// AWS SDK
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException
Expand Down Expand Up @@ -303,6 +306,7 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig)
// Enriched TSV lines along with their shredded components
val common = input
.map(line => loadAndShred(ResolverSingleton.get(shredConfig.igluConfig), line))
.setName("common")
.cache()

// Handling of malformed rows; drop good, turn malformed into `BadRow`
Expand All @@ -322,7 +326,6 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig)
(first.original, absent)
}
.setName("good")
.cache()

// Deduplication operation succeeded
val dupeSucceeded = good
Expand All @@ -334,7 +337,6 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig)
recordPayload(event.inventory.map(_.schemaKey))
event
}
.cache()

// Count synthetic duplicates, defined as events with the same id but different fingerprints
val syntheticDupes = dupeSucceeded
Expand All @@ -346,21 +348,30 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig)

// Join the properly-formed events with the synthetic duplicates, generate a new event ID for
// those that are synthetic duplicates
val goodWithSyntheticDupes = dupeSucceeded
val identifiedSyntheticDupes = dupeSucceeded
.map(event => event.event_id -> event)
.leftOuterJoin(syntheticDupes)
.map {
case (_, (shredded, None)) =>
shredded
case (_, (shredded, Some(_))) =>
val newEventId = UUID.randomUUID()
val newContext = SelfDescribingData(DuplicateSchema, json"""{"originalEventId":${shredded.event_id}}""")
val updatedContexts = newContext :: shredded.derived_contexts.data
shredded.copy(event_id = newEventId, derived_contexts = Contexts(updatedContexts))
}
.setName("goodWithSyntheticDupes")
.setName("identifiedSyntheticDupes")
.cache()

val uniqueGood = identifiedSyntheticDupes.flatMap {
case (_, (shredded, None)) => Some(shredded)
case _ => None
}.setName("uniqueGood")

// Avoid recomputing UUID at all costs in order to not create orphan shredded entities
val syntheticDupedGood = identifiedSyntheticDupes.flatMap {
case (_, (shredded, Some(_))) =>
val newEventId = UUID.randomUUID()
val newContext = SelfDescribingData(DuplicateSchema, json"""{"originalEventId":${shredded.event_id}}""")
val updatedContexts = newContext :: shredded.derived_contexts.data
Some(shredded.copy(event_id = newEventId, derived_contexts = Contexts(updatedContexts)))
case _ =>
None
}.persist(StorageLevel.MEMORY_AND_DISK_SER).setName("syntheticDupedGood")

val goodWithSyntheticDupes = (uniqueGood ++ syntheticDupedGood).cache().setName("goodWithSyntheticDupes")

// Ready the events for database load
val events = goodWithSyntheticDupes.map(e => Row(alterEnrichedEvent(e)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package snowplow.storage.spark

import java.io.{BufferedWriter, File, FileWriter, IOException}

import org.apache.commons.io.filefilter.IOFileFilter

import scala.collection.JavaConverters._
import scala.io.Source
import scala.util.Random
Expand Down Expand Up @@ -81,14 +83,20 @@ object ShredJobSpec {
}
}

/** Ignore empty files on output (necessary since https://github.com/snowplow/snowplow-rdb-loader/issues/142) */
val NonEmpty = new org.apache.commons.io.filefilter.IOFileFilter {
def accept(file: File): Boolean = file.length() > 1L
def accept(dir: File, name: String): Boolean = true
}

/**
* Recursively list files in a given path, excluding the supplied paths.
* @param root A root filepath
* @param exclusions A list of paths to exclude from the listing
* @return the list of files contained in the root, minus the exclusions
*/
def listFilesWithExclusions(root: File, exclusions: List[String]): List[String] =
FileUtils.listFiles(root, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE)
FileUtils.listFiles(root, NonEmpty, TrueFileFilter.TRUE)
.asScala
.toList
.map(_.getCanonicalPath)
Expand Down

0 comments on commit 4408f7c

Please sign in to comment.