Skip to content

Commit

Permalink
RDB Shredder: optimize DAG (close #181)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Oct 30, 2020
1 parent e1eb880 commit 74e5697
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,14 @@ object ShredJob extends SparkJob {

private[spark] val classesToRegister: Array[Class[_]] = Array(
classOf[Array[String]],
classOf[Array[UUID]],
classOf[SchemaKey],
classOf[SelfDescribingData[_]],
classOf[Event],
classOf[Hierarchy],
classOf[FinalRow],
classOf[Instant],
classOf[UUID],
classOf[com.snowplowanalytics.iglu.core.SchemaVer$Full],
classOf[io.circe.JsonObject$LinkedHashMapJsonObject],
classOf[io.circe.Json$JObject],
Expand All @@ -142,7 +144,9 @@ object ShredJob extends SparkJob {
classOf[scala.collection.immutable.Map$EmptyMap$],
classOf[scala.collection.immutable.Set$EmptySet$],
classOf[org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage],
classOf[scala.math.Ordering$$anon$4],
classOf[org.apache.spark.sql.catalyst.InternalRow],
Class.forName("com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anon$1"),
classOf[org.apache.spark.sql.execution.datasources.WriteTaskResult],
classOf[org.apache.spark.sql.execution.datasources.ExecutedWriteSummary],
classOf[org.apache.spark.sql.execution.datasources.BasicWriteTaskStats]
Expand Down Expand Up @@ -388,66 +392,54 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig)
val good = common
.flatMap { shredded => shredded.toOption }
.groupBy { s => (s.event_id, s.event_fingerprint.getOrElse(UUID.randomUUID().toString)) }
.map { case (_, s) =>
.flatMap { case (_, s) =>
val first = s.minBy(_.etl_tstamp)
val absent = dedupeCrossBatch(first, batchTimestamp, DuplicateStorageSingleton.get(eventsManifest))
(first, absent)
recordPayload(first.inventory.map(_.schemaKey))
dedupeCrossBatch(first, batchTimestamp, DuplicateStorageSingleton.get(eventsManifest)) match {
case Right(unique) if unique => Some(Right(first))
case Right(_) => None
case Left(badRow) => Some(Left(badRow))
}
}
.setName("good")

// Deduplication operation succeeded
val dupeSucceeded = good
.filter {
case (_, Right(r)) => r
case (_, Left(_)) => false
}
.map { case (event, _) =>
recordPayload(event.inventory.map(_.schemaKey))
event
}
.cache()

// Count synthetic duplicates, defined as events with the same id but different fingerprints
val syntheticDupes = dupeSucceeded
.groupBy(_.event_id)
val syntheticDupes = good
.flatMap {
case Right(e) => Some((e.event_id, 1L))
case Left(_) => None
}
.reduceByKey(_ + _)
.flatMap {
case (eventId, vs) if vs.size > 1 => Some((eventId, ()))
case (id, count) if count > 1 => Some(id)
case _ => None
}

val syntheticDupesBroadcasted = sc.broadcast(syntheticDupes.collect().toSet)

// Join the properly-formed events with the synthetic duplicates, generate a new event ID for
// those that are synthetic duplicates
val identifiedSyntheticDupes = dupeSucceeded
.map(event => event.event_id -> event)
.leftOuterJoin(syntheticDupes)
.setName("identifiedSyntheticDupes")
.cache()

val uniqueGood = identifiedSyntheticDupes.flatMap {
case (_, (event, None)) => Some(event)
case _ => None
}.setName("uniqueGood")

// Avoid recomputing UUID at all costs in order to not create orphan shredded entities
val syntheticDupedGood = identifiedSyntheticDupes.flatMap {
case (_, (event, Some(_))) =>
val newEventId = UUID.randomUUID()
val newContext = SelfDescribingData(DuplicateSchema, json"""{"originalEventId":${event.event_id}}""")
val updatedContexts = newContext :: event.derived_contexts.data
Some(event.copy(event_id = newEventId, derived_contexts = Contexts(updatedContexts)))
case _ =>
None
}.persist(StorageLevel.MEMORY_AND_DISK_SER).setName("syntheticDupedGood")

val withSyntheticDupes = (uniqueGood ++ syntheticDupedGood)
.map(shred).cache().setName("withSyntheticDupes")
val shredded = good.map { e =>
e.flatMap { event =>
val isSyntheticDupe = syntheticDupesBroadcasted.value.contains(event.event_id)
val updated = if (isSyntheticDupe) {
val newContext = SelfDescribingData(DuplicateSchema, json"""{"originalEventId":${event.event_id}}""")
val updatedContexts = newContext :: event.derived_contexts.data
val newEventId = UUID.randomUUID()
event.copy(event_id = newEventId, derived_contexts = Contexts(updatedContexts))
} else event
shred(updated)
}
}.cache()

val goodWithSyntheticDupes = withSyntheticDupes.flatMap(_.toOption)
val shreddedGood = shredded.flatMap(_.toOption)

// Ready the events for database load
val events = goodWithSyntheticDupes.map(_.atomic)
val events = shreddedGood.map(_.atomic)

// Update the shredded JSONs with the new deduplicated event IDs and stringify
val shredded = goodWithSyntheticDupes.flatMap(_.shredded)
val shreddedData = shreddedGood.flatMap(_.shredded)

// Write as strings to `atomic-events` directory
spark.createDataFrame(events, StructType(StructField("_", StringType, true) :: Nil))
Expand All @@ -457,20 +449,15 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig)

// Final output
shredConfig.storage.flatMap(_.blacklistTabular).map(_.nonEmpty) match {
case Some(true) | None => writeShredded(shredded.flatMap(_.json), true)
case Some(true) | None => writeShredded(shreddedData.flatMap(_.json), true)
case Some(false) => ()
}
writeShredded(shredded.flatMap(_.tabular), false)
writeShredded(shreddedData.flatMap(_.tabular), false)

// Deduplication operation failed due to DynamoDB
val dupeFailed = good.flatMap {
case (_, Left(m)) => Some(Row(m.compact))
case _ => None
}
// Data that failed TSV transformation
val shreddedBad = withSyntheticDupes.flatMap(_.swap.toOption.map(bad => Row(bad.compact)))
val shreddedBad = shredded.flatMap(_.swap.toOption.map(bad => Row(bad.compact)))

spark.createDataFrame(bad ++ dupeFailed ++ shreddedBad, StructType(StructField("_", StringType, true) :: Nil))
spark.createDataFrame(bad ++ shreddedBad, StructType(StructField("_", StringType, true) :: Nil))
.write
.mode(SaveMode.Overwrite)
.text(shredConfig.badFolder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object InvalidEnrichedEventsSpec {
val expected = json"""{
"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_parsing_error/jsonschema/2-0-0",
"data": {
"processor":{"artifact":"snowplow-rdb-shredder","version":"0.16.0"},
"processor":{"artifact":"snowplow-rdb-shredder","version":"0.18.0-rc1"},
"failure":{
"type":"RowDecodingError",
"errors":[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object InvalidJsonsSpec {
val expected = json"""{
"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_parsing_error/jsonschema/2-0-0",
"data":{
"processor":{"artifact":"snowplow-rdb-shredder","version":"0.16.0"},
"processor":{"artifact":"snowplow-rdb-shredder","version":"0.18.0-rc1"},
"failure":{
"type":"RowDecodingError",
"errors":[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object MissingJsonSchemaSpec {
"data":{
"processor":{
"artifact":"snowplow-rdb-shredder",
"version":"0.16.0"
"version":"0.18.0-rc1"
},
"failure":[
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ object NotEnrichedEventsSpec {
)

val expected = List(
json"""{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_parsing_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-rdb-shredder","version":"0.16.0"},"failure":{"type":"NotTSV"},"payload":""}}""",
json"""{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_parsing_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-rdb-shredder","version":"0.16.0"},"failure":{"type":"NotTSV"},"payload":"NOT AN ENRICHED EVENT"}}""",
json"""{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_parsing_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-rdb-shredder","version":"0.16.0"},"failure":{"type":"NotTSV"},"payload":"2012-05-21 07:14:47 FRA2 3343 83.4.209.35 GET d3t05xllj8hhgj.cloudfront.net"}}"""
json"""{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_parsing_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-rdb-shredder","version":"0.18.0-rc1"},"failure":{"type":"NotTSV"},"payload":""}}""",
json"""{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_parsing_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-rdb-shredder","version":"0.18.0-rc1"},"failure":{"type":"NotTSV"},"payload":"NOT AN ENRICHED EVENT"}}""",
json"""{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_parsing_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-rdb-shredder","version":"0.18.0-rc1"},"failure":{"type":"NotTSV"},"payload":"2012-05-21 07:14:47 FRA2 3343 83.4.209.35 GET d3t05xllj8hhgj.cloudfront.net"}}"""
).map(_.noSpaces)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object SchemaValidationFailedSpec {
val expected = json"""{
"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_iglu_error/jsonschema/2-0-0",
"data":{
"processor":{"artifact":"snowplow-rdb-shredder","version":"0.16.0"},
"processor":{"artifact":"snowplow-rdb-shredder","version":"0.18.0-rc1"},
"failure":[
{"schemaKey":"iglu:com.snowplowanalytics.snowplow/link_click/jsonschema/1-0-0","error":{"error":"ValidationError","dataReports":[{"message":"$$.targetUrl: is missing but it is required","path":"$$","keyword":"required","targets":["targetUrl"]}]}}
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,9 @@ class CrossBatchDeduplicationSpec extends Specification with ShredJobSpec {
val Some((lines, f)) = readPartFile(dirs.output, "atomic-events")
expectedFiles += f
val eventIds = lines.map(_.split("\t").apply(6))
eventIds mustEqual
eventIds must containTheSameElementsAs(
Seq(CrossBatchDeduplicationSpec.uniqueUuid, CrossBatchDeduplicationSpec.inbatchDupeUuid).map(_.toString)
)
}
"shred additional contexts into their appropriate path" in {
val Some((contexts, f)) = readPartFile(dirs.output,
Expand Down

0 comments on commit 74e5697

Please sign in to comment.