Skip to content

Commit

Permalink
RDB Shredder: add min and max timestamps of events for the batch to t…
Browse files Browse the repository at this point in the history
…he SQS message with shredded types (close #275)
  • Loading branch information
benjben authored and chuwy committed Jan 20, 2021
1 parent 72ba4dd commit d1fe634
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,17 @@ object Discovery {
.withMessageGroupId("shredding")

val (bucket, key) = S3.splitS3Key(message.base.withKey(FinalKeyName))
Either.catchNonFatal(s3Client.putObject(bucket, key, message.selfDescribingData.asString)) match {

Either.catchNonFatal(sqsClient.sendMessage(sqsMessage)) match {
case Left(e) =>
throw new RuntimeException(s"RDB Shredder could not write ${message.base.withKey(FinalKeyName)}", e)
throw new RuntimeException(s"Could not send shredded types ${message.selfDescribingData.asString} to SQS for ${message.base}", e)
case _ =>
()
}

Either.catchNonFatal(sqsClient.sendMessage(sqsMessage)) match {
Either.catchNonFatal(s3Client.putObject(bucket, key, message.selfDescribingData.asString)) match {
case Left(e) =>
throw new RuntimeException(s"RDB Shredder could not send shredded types [${message.selfDescribingData.asString}] to SQS", e)
throw new RuntimeException(s"Could send shredded types ${message.selfDescribingData.asString} to SQS but could not write ${message.base.withKey(FinalKeyName)}", e)
case _ =>
()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.S3.Folder

import com.snowplowanalytics.snowplow.shredder.Discovery.MessageProcessor
import com.snowplowanalytics.snowplow.shredder.transformation.{FinalRow, EventUtils}
import com.snowplowanalytics.snowplow.shredder.spark.{singleton, Sink, ShreddedTypesAccumulator}
import com.snowplowanalytics.snowplow.shredder.spark.{singleton, Sink, ShreddedTypesAccumulator, TimestampsAccumulator}

/**
* The Snowplow Shred job, written in Spark.
Expand All @@ -49,6 +49,10 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: CliConfig) exten
val shreddedTypesAccumulator = new ShreddedTypesAccumulator
sc.register(shreddedTypesAccumulator)

// Accumulator to track min and max timestamps
val timestampsAccumulator = new TimestampsAccumulator
sc.register(timestampsAccumulator)

/** Check if `shredType` should be transformed into TSV */
def isTabular(shredType: SchemaKey): Boolean =
Common.isTabular(shredConfig.config.formats)(shredType)
Expand Down Expand Up @@ -99,7 +103,6 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: CliConfig) exten
val first = s.minBy(_.etl_tstamp)
Deduplication.crossBatch(first, batchTimestamp, DuplicateStorageSingleton.get(eventsManifest)) match {
case Right(unique) if unique =>
ShreddedTypesAccumulator.recordShreddedType(shreddedTypesAccumulator, isTabular)(first.inventory.map(_.schemaKey))
Some(Right(first))
case Right(_) => None
case Left(badRow) => Some(Left(badRow))
Expand All @@ -126,6 +129,8 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: CliConfig) exten
// those that are synthetic duplicates
val shredded = good.map { e =>
e.flatMap { event =>
ShreddedTypesAccumulator.recordShreddedType(shreddedTypesAccumulator, isTabular)(event.inventory.map(_.schemaKey))
timestampsAccumulator.add(event)
val isSyntheticDupe = syntheticDupesBroadcasted.value.contains(event.event_id)
val withDupeContext = if (isSyntheticDupe) Deduplication.withSynthetic(event) else event
FinalRow.shred(shredConfig.igluConfig, isTabular, atomicLengths)(withDupeContext)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2020-2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.shredder.spark

import java.time.Instant

import org.apache.spark.util.AccumulatorV2
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event

import TimestampsAccumulator.BatchTimestamps

class TimestampsAccumulator extends AccumulatorV2[Event, Option[BatchTimestamps]] {

private var accum: Option[BatchTimestamps] = None

private def mergeWith(other: Option[BatchTimestamps]): Option[BatchTimestamps] =
(accum, other) match {
case (Some(t1), Some(t2)) => Some(TimestampsAccumulator.merge(t1, t2))
case (Some(t1), None) => Some(t1)
case (None, Some(t2)) => Some(t2)
case (None, None) => None
}

def merge(other: AccumulatorV2[Event, Option[BatchTimestamps]]): Unit = other match {
case o: TimestampsAccumulator =>
accum = mergeWith(o.accum)
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}

def isZero: Boolean = accum.isEmpty

def copy(): AccumulatorV2[Event, Option[BatchTimestamps]] = {
val newAcc = new TimestampsAccumulator
accum.synchronized {
newAcc.accum = mergeWith(newAcc.accum)
}
newAcc
}

def value = accum

def add(event: Event): Unit = {
accum = mergeWith(Some(BatchTimestamps(event.collector_tstamp, event.collector_tstamp)))
}

def reset(): Unit = {
accum = None
}
}

object TimestampsAccumulator {
case class BatchTimestamps(min: Instant, max: Instant)

def merge(t1: BatchTimestamps, t2: BatchTimestamps): BatchTimestamps = {
val min = if (t1.min.isBefore(t2.min)) t1.min else t2.min
val max = if (t1.max.isAfter(t2.max)) t1.max else t2.max
BatchTimestamps(min, max)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class AccumulatorSpec extends Specification with ShredJobSpec {
"return the list of shredded types with their format without --target" in {
val expected = Set(
ShreddedType(SchemaKey("org.schema", "WebPage" , "jsonschema" , SchemaVer.Full(1,0,0)), Format.JSON),
ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV),
ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "link_click", "jsonschema", SchemaVer.Full(1,0,0)), Format.JSON)
)
val actual = runShredJob(inputEvent).types.toSet
Expand All @@ -45,7 +44,6 @@ class AccumulatorSpec extends Specification with ShredJobSpec {
"return the list of shredded types with their format with --target and no blacklist" in {
val expected = Set(
ShreddedType(SchemaKey("org.schema", "WebPage" , "jsonschema" , SchemaVer.Full(1,0,0)), Format.TSV),
ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV),
ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "link_click", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV)
)
val actual = runShredJob(inputEvent, false, true).types.toSet
Expand All @@ -56,7 +54,6 @@ class AccumulatorSpec extends Specification with ShredJobSpec {
val linkClickSchema = SchemaCriterion("com.snowplowanalytics.snowplow", "link_click", "jsonschema", 1)
val expected = Set(
ShreddedType(SchemaKey("org.schema", "WebPage" , "jsonschema" , SchemaVer.Full(1,0,0)), Format.TSV),
ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV),
ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "link_click", "jsonschema", SchemaVer.Full(1,0,0)), Format.JSON)
)
val actual = runShredJob(inputEvent, false, true, List(linkClickSchema)).types.toSet
Expand All @@ -67,7 +64,6 @@ class AccumulatorSpec extends Specification with ShredJobSpec {
val randomSchema = SchemaCriterion("foo", "bar", "jsonschema", 1)
val expected = Set(
ShreddedType(SchemaKey("org.schema", "WebPage" , "jsonschema" , SchemaVer.Full(1,0,0)), Format.TSV),
ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV),
ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "link_click", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV)
)
val actual = runShredJob(inputEvent, false, true, List(randomSchema)).types.toSet
Expand Down

0 comments on commit d1fe634

Please sign in to comment.