From b1632a30c60a8b950fc31d088a3f306c271141fe Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Thu, 26 Nov 2020 22:38:22 +0100 Subject: [PATCH] RDB Shredder: send shredding info to SQS when it's done (close #200) --- build.sbt | 1 + .../rdbloader/common/LoaderMessage.scala | 2 +- .../snowplow/rdbloader/common/S3.scala | 2 +- .../S3Spec.scala | 32 +++++ .../spark/ShredJob.scala | 119 ++++++++++++++++-- .../spark/ShredJobConfig.scala | 13 +- .../spark/ShreddedTypesAccumulator.scala | 61 +++++++++ .../ShredJobSpec.scala | 33 +++-- .../good/AccumulatorSpec.scala | 86 +++++++++++++ project/BuildSettings.scala | 6 + project/Dependencies.scala | 1 + 11 files changed, 334 insertions(+), 22 deletions(-) create mode 100644 modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/S3Spec.scala create mode 100644 modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShreddedTypesAccumulator.scala create mode 100644 modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/good/AccumulatorSpec.scala diff --git a/build.sbt b/build.sbt index 1a81c338a..da650e6b7 100755 --- a/build.sbt +++ b/build.sbt @@ -92,6 +92,7 @@ lazy val shredder = project.in(file("modules/shredder")) libraryDependencies ++= Seq( // Java Dependencies.dynamodb, + Dependencies.sqs, // Scala Dependencies.decline, Dependencies.eventsManifest, diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/LoaderMessage.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/LoaderMessage.scala index c749dd834..299990ee5 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/LoaderMessage.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/LoaderMessage.scala @@ -39,7 +39,7 @@ sealed trait LoaderMessage { object LoaderMessage { val ShreddingCompleteKey: SchemaKey = - SchemaKey("com.snowplowanalytics.snowplow.storage.rdbloader", "shredding_complete", "jsonschema", SchemaVer.Full(1,0,0)) + SchemaKey("com.snowplowanalytics.snowplow.storage", "shredding_complete", "jsonschema", SchemaVer.Full(1,0,0)) /** Data format for shredded data */ sealed trait Format extends Product with Serializable diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/S3.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/S3.scala index e6c7497c0..0921be524 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/S3.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/S3.scala @@ -33,7 +33,7 @@ object S3 { object Folder extends tag.Tagger[S3FolderTag] { def parse(s: String): Either[String, Folder] = s match { - case _ if !correctlyPrefixed(s) => "Bucket name must start with s3:// prefix".asLeft + case _ if !correctlyPrefixed(s) => s"Bucket name $s doesn't start with s3:// s3a:// or s3n:// prefix".asLeft case _ if s.length > 1024 => "Key length cannot be more than 1024 symbols".asLeft case _ => coerce(s).asRight } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/S3Spec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/S3Spec.scala new file mode 100644 index 000000000..c633d2481 --- /dev/null +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/S3Spec.scala @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ + +package com.snowplowanalytics.snowplow.rdbloader.common + +import org.specs2.mutable.Specification + +class S3Spec extends Specification { + "S3.Folder.parse()" should { + "support s3:// prefix" >> { + val folder = "s3://foo/" + S3.Folder.parse(folder) must beRight + } + "support s3a:// prefix" >> { + val folder = "s3a://foo/" + S3.Folder.parse(folder) must beRight + } + "support s3n:// prefix" >> { + val folder = "s3n://foo/" + S3.Folder.parse(folder) must beRight + } + } +} \ No newline at end of file diff --git a/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala b/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala index 96dbe19a8..85e447118 100644 --- a/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala +++ b/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012-2019 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -42,17 +42,23 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} // AWS SDK import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException +import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder} +import com.amazonaws.services.sqs.model.SendMessageRequest +import com.amazonaws.{AmazonClientException, AmazonWebServiceRequest, ClientConfiguration} +import com.amazonaws.retry.RetryPolicy.RetryCondition +import com.amazonaws.retry.{PredefinedBackoffStrategies, RetryPolicy} +// Snowplow import com.snowplowanalytics.snowplow.analytics.scalasdk.{ Event, ParsingError } import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.Contexts import com.snowplowanalytics.snowplow.badrows.{ BadRow, Processor, Payload, Failure, FailureDetails } import com.snowplowanalytics.snowplow.eventsmanifest.{ EventsManifest, EventsManifestConfig } - -// Snowplow -import com.snowplowanalytics.iglu.core.SchemaVer -import com.snowplowanalytics.iglu.core.{ SchemaKey, SelfDescribingData } +import com.snowplowanalytics.iglu.core.{ SchemaKey, SchemaVer, SelfDescribingData } +import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.iglu.client.{ Client, ClientError } + import rdbloader.common._ +import rdbloader.common.LoaderMessage._ import rdbloader.generated.ProjectMetadata /** Helpers method for the shred job */ @@ -62,6 +68,10 @@ object ShredJob extends SparkJob { val processor = Processor(ProjectMetadata.name, ProjectMetadata.version) + final val SqsMaxRetries = 10 + final val SqsRetryBaseDelay = 1000 // milliseconds + final val SqsRetryMaxDelay = 20 * 1000 // milliseconds + val DuplicateSchema = SchemaKey("com.snowplowanalytics.snowplow", "duplicate", "jsonschema", SchemaVer.Full(1,0,0)) val AtomicSchema = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)) @@ -137,6 +147,7 @@ object ShredJob extends SparkJob { Class.forName("scala.math.Ordering$Reverse"), classOf[org.apache.spark.sql.catalyst.InternalRow], Class.forName("com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anon$1"), + Class.forName("com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anon$2"), classOf[org.apache.spark.sql.execution.datasources.WriteTaskResult], classOf[org.apache.spark.sql.execution.datasources.ExecutedWriteSummary], classOf[org.apache.spark.sql.execution.datasources.BasicWriteTaskStats] @@ -149,11 +160,23 @@ object ShredJob extends SparkJob { .registerKryoClasses(classesToRegister) def run(spark: SparkSession, args: Array[String]): Unit = { + val jobStartedTimestamp = Instant.now() + + val semVer = Semver.decodeSemver(ProjectMetadata.version) match { + case Right(vers) => vers + case Left(failure) => throw new RuntimeException(failure) + } + // Job configuration val shredConfig = ShredJobConfig .loadConfigFrom(args) .valueOr(e => throw FatalEtlError(e)) + val shreddedFolder = S3.Folder.parse(shredConfig.outFolder) match { + case Right(folder) => folder + case Left(failure) => throw new RuntimeException(failure) + } + val job = new ShredJob(spark, shredConfig) val atomicLengths = singleton.IgluSingleton.get(shredConfig.igluConfig).resolver.lookupSchema(AtomicSchema) match { // TODO: retry @@ -163,6 +186,9 @@ object ShredJob extends SparkJob { throw new RuntimeException(s"RDB Shredder could not fetch ${AtomicSchema.toSchemaUri} schema at initialization. ${(error: ClientError).show}") } + + val sqsClient: AmazonSQS = createSqsClient() + val eventsManifest: Option[EventsManifestConfig] = shredConfig.duplicateStorageConfig.map { json => val config = EventsManifestConfig .parseJson[Id](singleton.IgluSingleton.get(shredConfig.igluConfig), json) @@ -171,7 +197,35 @@ object ShredJob extends SparkJob { config } - job.run(atomicLengths, eventsManifest) + val shreddedTypes = job.run(atomicLengths, eventsManifest).toList + val jobCompletedTimestamp = Instant.now() + val timestamps = Timestamps( + jobStartedTimestamp, + jobCompletedTimestamp, + None, // TODO: read from events + None // TODO: read from events + ) + val processor = LoaderMessage.Processor(ProjectMetadata.shredderName, semVer) + val shreddingComplete = ShreddingComplete( + shreddedFolder, + shreddedTypes, + timestamps, + processor + ) + + val sqsMessage: SendMessageRequest = + new SendMessageRequest() + .withQueueUrl(shredConfig.sqsQueue) + .withMessageBody(shreddingComplete.selfDescribingData.asString) + + sqsMessage.setMessageGroupId("shredding") + + Either.catchNonFatal(sqsClient.sendMessage(sqsMessage)) match { + case Left(e) => + throw new RuntimeException(s"RDB Shredder could not send shredded types [$shreddedTypes] to SQS with error [${e.getMessage}]") + case _ => + () + } } /** @@ -252,7 +306,30 @@ object ShredJob extends SparkJob { } case _ => Right(true) } + } + /** Create SQS client with built-in retry mechanism (jitter) */ + private def createSqsClient(): AmazonSQS = { + AmazonSQSClientBuilder + .standard() + .withClientConfiguration( + new ClientConfiguration().withRetryPolicy( + new RetryPolicy( + new RetryCondition { + override def shouldRetry( + originalRequest: AmazonWebServiceRequest, + exception: AmazonClientException, + retriesAttempted: Int + ): Boolean = + retriesAttempted < SqsMaxRetries + }, + new PredefinedBackoffStrategies.FullJitterBackoffStrategy(SqsRetryBaseDelay, SqsRetryMaxDelay), + SqsMaxRetries, + true + ) + ) + ) + .build } } @@ -270,11 +347,35 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig) val shreddedTypes = new StringSetAccumulator sc.register(shreddedTypes) + // Accumulator to track shredded types + val shreddedTypesSqs = new ShreddedTypesAccumulator + sc.register(shreddedTypesSqs) + /** Save set of found shredded types into accumulator if processing manifest is enabled */ def recordPayload(inventory: Set[SchemaKey]): Unit = if (shredConfig.dynamodbManifestTable.isEmpty) () else shreddedTypes.add(inventory.map(_.toSchemaUri)) + /** Save set of shredded types into accumulator, for master to send to SQS */ + def recordShreddedType(inventory: Set[SchemaKey]): Unit = { + val withFormat: Set[ShreddedType] = + inventory + .map { schemaKey => + shredConfig.storage match { + case None => + ShreddedType(schemaKey, Format.JSON) + case Some(storage) => + storage.blacklistTabular.map(_.exists(_.matches(schemaKey))) match { + case Some(true) => + ShreddedType(schemaKey, Format.JSON) + case _ => + ShreddedType(schemaKey, Format.TSV) + } + } + } + shreddedTypesSqs.add(withFormat) + } + /** Check if `shredType` should be transformed into TSV */ def isTabular(shredType: SchemaKey): Boolean = shredConfig.storage.flatMap(_.blacklistTabular) match { @@ -296,7 +397,7 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig) * - writing out JSON contexts as well as properly-formed and malformed events */ def run(atomicLengths: Map[String, Int], - eventsManifest: Option[EventsManifestConfig]): Unit = { + eventsManifest: Option[EventsManifestConfig]): Set[ShreddedType] = { import ShredJob._ def shred(event: Event): Either[BadRow, FinalRow] = @@ -349,6 +450,7 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig) .flatMap { case (_, s) => val first = s.minBy(_.etl_tstamp) recordPayload(first.inventory.map(_.schemaKey)) + recordShreddedType(first.inventory.map(_.schemaKey)) dedupeCrossBatch(first, batchTimestamp, DuplicateStorageSingleton.get(eventsManifest)) match { case Right(unique) if unique => Some(Right(first)) case Right(_) => None @@ -415,5 +517,8 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig) .write .mode(SaveMode.Overwrite) .text(shredConfig.badFolder) + + val shreddedAtomic = ShreddedType(AtomicSchema, Format.TSV) + (shreddedTypesSqs.value + shreddedAtomic).toSet } } diff --git a/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJobConfig.scala b/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJobConfig.scala index f4fc89433..3bbcf6e42 100644 --- a/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJobConfig.scala +++ b/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJobConfig.scala @@ -46,7 +46,8 @@ case class ShredJobConfig(inFolder: String, duplicateStorageConfig: Option[Json], dynamodbManifestTable: Option[String], itemId: Option[String], - storage: Option[StorageTarget]) { + storage: Option[StorageTarget], + sqsQueue: String) { /** Get both manifest table and item id to process */ def getManifestData: Option[(String, String)] = @@ -98,10 +99,14 @@ object ShredJobConfig { "base64-encoded string with single storage target configuration JSON", "t", "target.json") .mapValidated(Base64Json.decode).orNone - val shredJobConfig = (inputFolder, outputFolder, badFolder, igluConfig, duplicateStorageConfig, processingManifestTable, itemId, storageTarget).mapN { - (input, output, bad, iglu, dupeStorage, manifest, itemId, target) => (ShredJobConfig(input, output, bad, iglu, dupeStorage, manifest, itemId, None), target) + val sqsQueue = Opts.option[String]("sqs-queue", + "Name of the SQS queue where to write shredded info for shredder", + metavar = "SQS queue") + + val shredJobConfig = (inputFolder, outputFolder, badFolder, igluConfig, duplicateStorageConfig, processingManifestTable, itemId, storageTarget, sqsQueue).mapN { + (input, output, bad, iglu, dupeStorage, manifest, itemId, target, queue) => (ShredJobConfig(input, output, bad, iglu, dupeStorage, manifest, itemId, None, queue), target) }.validate("--item-id and --processing-manifest-table must be either both provided or both absent") { - case (ShredJobConfig(_, _, _, _, _, manifest, i, _), _) => (manifest.isDefined && i.isDefined) || (manifest.isEmpty && i.isEmpty) + case (ShredJobConfig(_, _, _, _, _, manifest, i, _, _), _) => (manifest.isDefined && i.isDefined) || (manifest.isEmpty && i.isEmpty) case _ => false }.mapValidated { case (config, Some(target)) => diff --git a/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShreddedTypesAccumulator.scala b/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShreddedTypesAccumulator.scala new file mode 100644 index 000000000..d183a6b1d --- /dev/null +++ b/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShreddedTypesAccumulator.scala @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.spark + +import org.apache.spark.util.AccumulatorV2 + +import scala.collection.mutable + +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.ShreddedType + +import ShreddedTypesAccumulator._ + +class ShreddedTypesAccumulator extends AccumulatorV2[KeyAccum, KeyAccum] { + + private val accum = mutable.Set.empty[ShreddedType] + + def merge(other: AccumulatorV2[KeyAccum, KeyAccum]): Unit = other match { + case o: ShreddedTypesAccumulator => accum ++= o.accum + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + def isZero: Boolean = accum.isEmpty + + def copy(): AccumulatorV2[KeyAccum, KeyAccum] = { + val newAcc = new ShreddedTypesAccumulator + accum.synchronized { + newAcc.accum ++= accum + } + newAcc + } + + def value = accum + + def add(keys: KeyAccum): Unit = { + accum ++= keys + } + + def add(keys: Set[ShreddedType]): Unit = { + val mutableSet = mutable.Set(keys.toList: _*) + add(mutableSet) + } + + def reset(): Unit = { + accum.clear() + } +} + +object ShreddedTypesAccumulator { + type KeyAccum = mutable.Set[ShreddedType] +} diff --git a/modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala b/modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala index 85df1dbe1..456885b64 100644 --- a/modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala +++ b/modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala @@ -41,11 +41,13 @@ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods.{compact, parse} import com.snowplowanalytics.iglu.core.SelfDescribingData +import com.snowplowanalytics.iglu.core.SchemaKey import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.eventsmanifest.EventsManifestConfig import com.snowplowanalytics.snowplow.rdbloader.generated.ProjectMetadata +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.ShreddedType // Specs2 import org.specs2.matcher.Matcher @@ -314,13 +316,14 @@ trait ShredJobSpec extends SparkSpec { * Run the shred job with the specified lines as input. * @param lines input lines */ - def runShredJob(lines: Lines, crossBatchDedupe: Boolean = false): Unit = { + def runShredJob(lines: Lines, crossBatchDedupe: Boolean = false): Set[ShreddedType] = { val input = mkTmpFile("input", createParents = true, containing = lines.some) val config = Array( "--input-folder", input.toString(), "--output-folder", dirs.output.toString(), "--bad-folder", dirs.badRows.toString(), - "--iglu-config", igluConfig + "--iglu-config", igluConfig, + "--sqs-queue", "shreddedTypes" ) val (dedupeConfigCli, dedupeConfig) = if (crossBatchDedupe) { @@ -337,18 +340,20 @@ trait ShredJobSpec extends SparkSpec { .fold(e => throw new RuntimeException(s"Cannot parse test configuration: $e"), c => c) val job = new ShredJob(spark, shredJobConfig) - job.run(Map.empty, dedupeConfig) + val result = job.run(Map.empty, dedupeConfig) deleteRecursively(input) + result } - def runShredJobTabular(lines: Lines, crossBatchDedupe: Boolean = false): Unit = { + def runShredJobTabular(lines: Lines, crossBatchDedupe: Boolean = false, tabularBlacklist: Option[List[SchemaKey]] = Some(Nil)): Set[ShreddedType] = { val input = mkTmpFile("input", createParents = true, containing = lines.some) val config = Array( "--input-folder", input.toString(), "--output-folder", dirs.output.toString(), "--bad-folder", dirs.badRows.toString(), "--iglu-config", igluConfigWithLocal, - "--target", storageConfig + "--target", storageConfig(tabularBlacklist), + "--sqs-queue", "shreddedTypes" ) val (dedupeConfigCli, dedupeConfig) = if (crossBatchDedupe) { @@ -365,13 +370,23 @@ trait ShredJobSpec extends SparkSpec { .fold(e => throw new RuntimeException(s"Cannot parse test configuration: $e"), c => c) val job = new ShredJob(spark, shredJobConfig) - job.run(Map.empty, dedupeConfig) + val result = job.run(Map.empty, dedupeConfig) deleteRecursively(input) + result } - val storageConfig = { + def storageConfig(tabularBlacklist: Option[List[SchemaKey]]) = { val encoder = new Base64(true) - new String(encoder.encode("""{ + val blacklistTabular = tabularBlacklist match { + case None => "" + case Some(blacklist) => + val blacklistStr = + blacklist + .map(s => s""""${s.toSchemaUri}"""") + .mkString(",") + s""""blacklistTabular": [ $blacklistStr ],""" + } + new String(encoder.encode(s"""{ "schema": "iglu:com.snowplowanalytics.snowplow.storage/redshift_config/jsonschema/4-0-0", "data": { "name": "AWS Redshift enriched events storage", @@ -392,7 +407,7 @@ trait ShredJobSpec extends SparkSpec { "schema": "atomic", "maxError": 1, "compRows": 20000, - "blacklistTabular": [], + $blacklistTabular "purpose": "ENRICHED_EVENTS" } } """.stripMargin.replaceAll("[\n\r]","").getBytes())) diff --git a/modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/good/AccumulatorSpec.scala b/modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/good/AccumulatorSpec.scala new file mode 100644 index 000000000..81f2549a2 --- /dev/null +++ b/modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/good/AccumulatorSpec.scala @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and + * limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.spark +package good + +import org.specs2.mutable.Specification + +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} + +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.ShreddedType +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.Format + +class AccumulatorSpec extends Specification with ShredJobSpec { + import ShredJobSpec._ + + val inputEvent = Lines( + """snowplowweb web 2014-06-01 14:04:11.639 2014-05-29 18:16:35.000 2014-05-29 18:16:35.967 unstruct 2b1b25a4-c0df-4859-8201-cf21492ad61b 114221 clojure js-2.0.0-M2 clj-0.6.0-tom-0.0.4 hadoop-0.5.0-common-0.4.0 68.42.204.218 1242058182 58df65c46e1ac937 11 437ad25b-2006-455e-b5d8-d664b74df8f3 US MI Holland 49423 42.742294 -86.0661 http://snowplowanalytics.com/blog/ https://www.google.com/ http snowplowanalytics.com 80 /blog/ https www.google.com 80 / search Google {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:org.schema/WebPage/jsonschema/1-0-0","data":{"datePublished":"2014-07-23T00:00:00Z","author":"Jonathan Almeida","inLanguage":"en-US","genre":"blog","breadcrumb":["blog","releases"],"keywords":["snowplow","analytics","java","jvm","tracker"]}}]} {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/link_click/jsonschema/1-0-0","data":{"targetUrl":"http://snowplowanalytics.com/blog/page2","elementClasses":["next"]}}} Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.114 Safari/537.36 Chrome Chrome Browser WEBKIT en-US 1 1 1 0 1 0 0 0 1 1 24 1241 806 Mac OS Mac OS Apple Inc. America/New_York Computer 0 1440 900 UTF-8 """ + ) + + override def appName = "accumulator" + sequential + "A shredding job" should { + "return the list of shredded types with their format without --target" in { + val expected = Set( + ShreddedType(SchemaKey("org.schema", "WebPage" , "jsonschema" , SchemaVer.Full(1,0,0)), Format.JSON), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "link_click", "jsonschema", SchemaVer.Full(1,0,0)), Format.JSON) + ) + val actual = runShredJob(inputEvent, false) + actual ==== expected + } + + "return the list of shredded types with their format with --target and no blacklist" in { + val expected = Set( + ShreddedType(SchemaKey("org.schema", "WebPage" , "jsonschema" , SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "link_click", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV) + ) + val actual = runShredJobTabular(inputEvent, false, None) + actual ==== expected + } + + "return the list of shredded types with their format with --target and empty blacklist" in { + val expected = Set( + ShreddedType(SchemaKey("org.schema", "WebPage" , "jsonschema" , SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "link_click", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV) + ) + val actual = runShredJobTabular(inputEvent, false, Some(Nil)) + actual ==== expected + } + + "return the list of shredded types with their format with --target and schema in non-empty blacklist" in { + val linkClickSchema = SchemaKey("com.snowplowanalytics.snowplow", "link_click", "jsonschema", SchemaVer.Full(1,0,0)) + val expected = Set( + ShreddedType(SchemaKey("org.schema", "WebPage" , "jsonschema" , SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "link_click", "jsonschema", SchemaVer.Full(1,0,0)), Format.JSON) + ) + val actual = runShredJobTabular(inputEvent, false, Some(List(linkClickSchema))) + actual ==== expected + } + + "return the list of shredded types with their format with --target and schema not in non-empty blacklist" in { + val expected = Set( + ShreddedType(SchemaKey("org.schema", "WebPage" , "jsonschema" , SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "link_click", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV) + ) + val actual = runShredJobTabular(inputEvent, false, Some(List(SchemaKey("foo", "bar", "jsonschema", SchemaVer.Full(1,0,0))))) + actual ==== expected + } + } +} \ No newline at end of file diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index abd1766a2..4bbf51d1b 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -95,6 +95,12 @@ object BuildSettings { case x if x.startsWith("META-INF") => MergeStrategy.discard case x if x.endsWith(".html") => MergeStrategy.discard case x if x.endsWith("package-info.class") => MergeStrategy.first + case x if x.endsWith("customization.config") => MergeStrategy.first + case x if x.endsWith("examples-1.json") => MergeStrategy.first + case x if x.endsWith("paginators-1.json") => MergeStrategy.first + case x if x.endsWith("service-2.json") => MergeStrategy.first + case x if x.endsWith("waiters-2.json") => MergeStrategy.first + case x if x.endsWith("mime.types") => MergeStrategy.first case x if x.endsWith("module-info.class") => MergeStrategy.discard case PathList("com", "google", "common", _) => MergeStrategy.first case PathList("org", "apache", "spark", "unused", _) => MergeStrategy.first diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 60224de5a..51605e6d3 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -88,6 +88,7 @@ object Dependencies { // Java (Shredder) val dynamodb = "com.amazonaws" % "aws-java-sdk-dynamodb" % V.aws + val sqs = "com.amazonaws" % "aws-java-sdk-sqs" % V.aws // Scala (test only) val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % "test"