Skip to content

Commit

Permalink
RDB Shredder: send shredding info to SQS when it's done (close #200)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben authored and chuwy committed Jan 19, 2021
1 parent 4aaa47b commit fb650c5
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 18 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ lazy val shredder = project.in(file("modules/shredder"))
libraryDependencies ++= Seq(
// Java
Dependencies.dynamodb,
Dependencies.sqs,
// Scala
Dependencies.decline,
Dependencies.eventsManifest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ sealed trait LoaderMessage {
object LoaderMessage {

val ShreddingCompleteKey: SchemaKey =
SchemaKey("com.snowplowanalytics.snowplow.storage.rdbloader", "shredding_complete", "jsonschema", SchemaVer.Full(1,0,0))
SchemaKey("com.snowplowanalytics.snowplow.storage", "shredding_complete", "jsonschema", SchemaVer.Full(1,0,0))

/** Data format for shredded data */
sealed trait Format extends Product with Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object S3 {
object Folder extends tag.Tagger[S3FolderTag] {

def parse(s: String): Either[String, Folder] = s match {
case _ if !correctlyPrefixed(s) => "Bucket name must start with s3:// prefix".asLeft
case _ if !correctlyPrefixed(s) => s"Bucket name $s doesn't start with s3:// s3a:// or s3n:// prefix".asLeft
case _ if s.length > 1024 => "Key length cannot be more than 1024 symbols".asLeft
case _ => coerce(s).asRight
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/

package com.snowplowanalytics.snowplow.rdbloader.common

import org.specs2.mutable.Specification

class S3Spec extends Specification {
"S3.Folder.parse()" should {
"support s3:// prefix" >> {
val folder = "s3://foo/"
S3.Folder.parse(folder) must beRight
}
"support s3a:// prefix" >> {
val folder = "s3a://foo/"
S3.Folder.parse(folder) must beRight
}
"support s3n:// prefix" >> {
val folder = "s3n://foo/"
S3.Folder.parse(folder) must beRight
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012-2019 Snowplow Analytics Ltd. All rights reserved.
* Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
Expand Down Expand Up @@ -42,17 +42,23 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}

// AWS SDK
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException
import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder}
import com.amazonaws.services.sqs.model.SendMessageRequest
import com.amazonaws.{AmazonClientException, AmazonWebServiceRequest, ClientConfiguration}
import com.amazonaws.retry.RetryPolicy.RetryCondition
import com.amazonaws.retry.{PredefinedBackoffStrategies, RetryPolicy}

// Snowplow
import com.snowplowanalytics.snowplow.analytics.scalasdk.{ Event, ParsingError }
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.Contexts
import com.snowplowanalytics.snowplow.badrows.{ BadRow, Processor, Payload, Failure, FailureDetails }
import com.snowplowanalytics.snowplow.eventsmanifest.{ EventsManifest, EventsManifestConfig }

// Snowplow
import com.snowplowanalytics.iglu.core.SchemaVer
import com.snowplowanalytics.iglu.core.{ SchemaKey, SelfDescribingData }
import com.snowplowanalytics.iglu.core.{ SchemaKey, SchemaVer, SelfDescribingData }
import com.snowplowanalytics.iglu.core.circe.implicits._
import com.snowplowanalytics.iglu.client.{ Client, ClientError }

import rdbloader.common._
import rdbloader.common.LoaderMessage._
import rdbloader.generated.ProjectMetadata

/** Helpers method for the shred job */
Expand All @@ -62,6 +68,10 @@ object ShredJob extends SparkJob {

val processor = Processor(ProjectMetadata.name, ProjectMetadata.version)

final val SqsMaxRetries = 10
final val SqsRetryBaseDelay = 1000 // milliseconds
final val SqsRetryMaxDelay = 20 * 1000 // milliseconds

val DuplicateSchema = SchemaKey("com.snowplowanalytics.snowplow", "duplicate", "jsonschema", SchemaVer.Full(1,0,0))

val AtomicSchema = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0))
Expand Down Expand Up @@ -137,6 +147,7 @@ object ShredJob extends SparkJob {
Class.forName("scala.math.Ordering$Reverse"),
classOf[org.apache.spark.sql.catalyst.InternalRow],
Class.forName("com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anon$1"),
Class.forName("com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anon$2"),
classOf[org.apache.spark.sql.execution.datasources.WriteTaskResult],
classOf[org.apache.spark.sql.execution.datasources.ExecutedWriteSummary],
classOf[org.apache.spark.sql.execution.datasources.BasicWriteTaskStats]
Expand All @@ -149,11 +160,23 @@ object ShredJob extends SparkJob {
.registerKryoClasses(classesToRegister)

def run(spark: SparkSession, args: Array[String]): Unit = {
val jobStartedTimestamp = Instant.now()

val semVer = Semver.decodeSemver(ProjectMetadata.version) match {
case Right(vers) => vers
case Left(failure) => throw new RuntimeException(failure)
}

// Job configuration
val shredConfig = ShredJobConfig
.loadConfigFrom(args)
.valueOr(e => throw FatalEtlError(e))

val shreddedFolder = S3.Folder.parse(shredConfig.outFolder) match {
case Right(folder) => folder
case Left(failure) => throw new RuntimeException(failure)
}

val job = new ShredJob(spark, shredConfig)

val atomicLengths = singleton.IgluSingleton.get(shredConfig.igluConfig).resolver.lookupSchema(AtomicSchema) match { // TODO: retry
Expand All @@ -163,6 +186,9 @@ object ShredJob extends SparkJob {
throw new RuntimeException(s"RDB Shredder could not fetch ${AtomicSchema.toSchemaUri} schema at initialization. ${(error: ClientError).show}")
}


val sqsClient: AmazonSQS = createSqsClient()

val eventsManifest: Option[EventsManifestConfig] = shredConfig.duplicateStorageConfig.map { json =>
val config = EventsManifestConfig
.parseJson[Id](singleton.IgluSingleton.get(shredConfig.igluConfig), json)
Expand All @@ -171,7 +197,35 @@ object ShredJob extends SparkJob {
config
}

job.run(atomicLengths, eventsManifest)
val shreddedTypes = job.run(atomicLengths, eventsManifest).toList
val jobCompletedTimestamp = Instant.now()
val timestamps = Timestamps(
jobStartedTimestamp,
jobCompletedTimestamp,
None, // TODO: read from events
None // TODO: read from events
)
val processor = LoaderMessage.Processor(ProjectMetadata.shredderName, semVer)
val shreddingComplete = ShreddingComplete(
shreddedFolder,
shreddedTypes,
timestamps,
processor
)

val sqsMessage: SendMessageRequest =
new SendMessageRequest()
.withQueueUrl(shredConfig.storage.messageQueue)
.withMessageBody(shreddingComplete.selfDescribingData.asString)

sqsMessage.setMessageGroupId("shredding")

Either.catchNonFatal(sqsClient.sendMessage(sqsMessage)) match {
case Left(e) =>
throw new RuntimeException(s"RDB Shredder could not send shredded types [$shreddedTypes] to SQS with error [${e.getMessage}]")
case _ =>
()
}
}

/**
Expand Down Expand Up @@ -252,7 +306,30 @@ object ShredJob extends SparkJob {
}
case _ => Right(true)
}
}

/** Create SQS client with built-in retry mechanism (jitter) */
private def createSqsClient(): AmazonSQS = {
AmazonSQSClientBuilder
.standard()
.withClientConfiguration(
new ClientConfiguration().withRetryPolicy(
new RetryPolicy(
new RetryCondition {
override def shouldRetry(
originalRequest: AmazonWebServiceRequest,
exception: AmazonClientException,
retriesAttempted: Int
): Boolean =
retriesAttempted < SqsMaxRetries
},
new PredefinedBackoffStrategies.FullJitterBackoffStrategy(SqsRetryBaseDelay, SqsRetryMaxDelay),
SqsMaxRetries,
true
)
)
)
.build
}
}

Expand All @@ -267,8 +344,19 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig)
import singleton._

// Accumulator to track shredded types
val shreddedTypes = new StringSetAccumulator
sc.register(shreddedTypes)
val shreddedTypesSqs = new ShreddedTypesAccumulator
sc.register(shreddedTypesSqs)

/** Save set of shredded types into accumulator, for master to send to SQS */
def recordShreddedType(inventory: Set[SchemaKey]): Unit = {
val withFormat: Set[ShreddedType] =
inventory
.map { schemaKey =>
if (isTabular(schemaKey)) ShreddedType(schemaKey, Format.TSV)
else ShreddedType(schemaKey, Format.JSON)
}
shreddedTypesSqs.add(withFormat)
}

/** Check if `shredType` should be transformed into TSV */
def isTabular(shredType: SchemaKey): Boolean =
Expand All @@ -288,7 +376,7 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig)
* - writing out JSON contexts as well as properly-formed and malformed events
*/
def run(atomicLengths: Map[String, Int],
eventsManifest: Option[EventsManifestConfig]): Unit = {
eventsManifest: Option[EventsManifestConfig]): Set[ShreddedType] = {
import ShredJob._

def shred(event: Event): Either[BadRow, FinalRow] =
Expand Down Expand Up @@ -340,6 +428,7 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig)
.groupBy { s => (s.event_id, s.event_fingerprint.getOrElse(UUID.randomUUID().toString)) }
.flatMap { case (_, s) =>
val first = s.minBy(_.etl_tstamp)
recordShreddedType(first.inventory.map(_.schemaKey))
dedupeCrossBatch(first, batchTimestamp, DuplicateStorageSingleton.get(eventsManifest)) match {
case Right(unique) if unique => Some(Right(first))
case Right(_) => None
Expand Down Expand Up @@ -408,5 +497,8 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig)
.write
.mode(SaveMode.Overwrite)
.text(shredConfig.badFolder)

val shreddedAtomic = ShreddedType(AtomicSchema, Format.TSV)
(shreddedTypesSqs.value + shreddedAtomic).toSet
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.storage.spark

import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable

import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.ShreddedType

import ShreddedTypesAccumulator._

class ShreddedTypesAccumulator extends AccumulatorV2[KeyAccum, KeyAccum] {

private val accum = mutable.Set.empty[ShreddedType]

def merge(other: AccumulatorV2[KeyAccum, KeyAccum]): Unit = other match {
case o: ShreddedTypesAccumulator => accum ++= o.accum
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}

def isZero: Boolean = accum.isEmpty

def copy(): AccumulatorV2[KeyAccum, KeyAccum] = {
val newAcc = new ShreddedTypesAccumulator
accum.synchronized {
newAcc.accum ++= accum
}
newAcc
}

def value = accum

def add(keys: KeyAccum): Unit = {
accum ++= keys
}

def add(keys: Set[ShreddedType]): Unit = {
val mutableSet = mutable.Set(keys.toList: _*)
add(mutableSet)
}

def reset(): Unit = {
accum.clear()
}
}

object ShreddedTypesAccumulator {
type KeyAccum = mutable.Set[ShreddedType]
}
Loading

0 comments on commit fb650c5

Please sign in to comment.