Skip to content

Commit

Permalink
RDB Shredder: use single etl_tstamp for cross-batch deduplication (close
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Oct 30, 2020
1 parent dcc0ed0 commit e1eb880
Showing 1 changed file with 17 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,16 @@ object ShredJob extends SparkJob {
* If provisioned throughput exception happened - interrupt whole job
* If other runtime exception happened - failure is returned to be used as bad row
* @param event whole enriched event with possibly faked fingerprint
* @param tstamp the ETL tstamp, an earliest timestamp in a batch
* @param duplicateStorage object dealing with possible duplicates
* @return boolean inside validation, denoting presence or absence of event in storage
*/
@throws[UnexpectedEtlException]
def dedupeCrossBatch(event: Event, duplicateStorage: Option[EventsManifest]): Either[BadRow, Boolean] = {
def dedupeCrossBatch(event: Event, tstamp: Instant, duplicateStorage: Option[EventsManifest]): Either[BadRow, Boolean] = {
(event, duplicateStorage) match {
case (_, Some(storage)) =>
try {
Right(storage.put(event.event_id, event.event_fingerprint.getOrElse(UUID.randomUUID().toString), event.etl_tstamp.getOrElse(StartTime)))
Right(storage.put(event.event_id, event.event_fingerprint.getOrElse(UUID.randomUUID().toString), tstamp))
} catch {
case e: ProvisionedThroughputExceededException =>
throw UnexpectedEtlException(e.toString)
Expand Down Expand Up @@ -365,6 +366,19 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig)
.setName("common")
.cache()

// Find an earliest timestamp in the batch. Will be used only if CB deduplication is enabled
val batchTimestamp = eventsManifest match {
case Some(_) =>
common.takeOrdered(1)(new Ordering[Either[BadRow, Event]] {
def compare(x: Either[BadRow, Event], y: Either[BadRow, Event]): Int =
(x.map(_.etl_tstamp), y.map(_.etl_tstamp)) match {
case (Right(Some(xt)), Right(Some(yt))) => Ordering[Instant].compare(xt, yt)
case _ => 0
}
}).headOption.flatMap(_.toOption).flatMap(_.etl_tstamp).getOrElse(StartTime)
case None => StartTime
}

// Handling of malformed rows; drop good, turn malformed into `BadRow`
val bad = common.flatMap { shredded => shredded.swap.toOption.map(bad => Row(bad.compact)) }

Expand All @@ -376,7 +390,7 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig)
.groupBy { s => (s.event_id, s.event_fingerprint.getOrElse(UUID.randomUUID().toString)) }
.map { case (_, s) =>
val first = s.minBy(_.etl_tstamp)
val absent = dedupeCrossBatch(first, DuplicateStorageSingleton.get(eventsManifest))
val absent = dedupeCrossBatch(first, batchTimestamp, DuplicateStorageSingleton.get(eventsManifest))
(first, absent)
}
.setName("good")
Expand Down

0 comments on commit e1eb880

Please sign in to comment.