From fb650c528fb32ed1fbbaf1623a7e410f1de1eccc Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Thu, 26 Nov 2020 22:38:22 +0100 Subject: [PATCH] RDB Shredder: send shredding info to SQS when it's done (close #200) --- build.sbt | 1 + .../rdbloader/common/LoaderMessage.scala | 2 +- .../snowplow/rdbloader/common/S3.scala | 2 +- .../S3Spec.scala | 32 +++++ .../spark/ShredJob.scala | 110 ++++++++++++++++-- .../spark/ShreddedTypesAccumulator.scala | 61 ++++++++++ .../ShredJobSpec.scala | 18 +-- .../good/AccumulatorSpec.scala | 76 ++++++++++++ project/BuildSettings.scala | 6 + project/Dependencies.scala | 1 + 10 files changed, 291 insertions(+), 18 deletions(-) create mode 100644 modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/S3Spec.scala create mode 100644 modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShreddedTypesAccumulator.scala create mode 100644 modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/good/AccumulatorSpec.scala diff --git a/build.sbt b/build.sbt index 5992f9222..16760b070 100755 --- a/build.sbt +++ b/build.sbt @@ -101,6 +101,7 @@ lazy val shredder = project.in(file("modules/shredder")) libraryDependencies ++= Seq( // Java Dependencies.dynamodb, + Dependencies.sqs, // Scala Dependencies.decline, Dependencies.eventsManifest, diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/LoaderMessage.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/LoaderMessage.scala index b4e8fd350..79b3a99ec 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/LoaderMessage.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/LoaderMessage.scala @@ -39,7 +39,7 @@ sealed trait LoaderMessage { object LoaderMessage { val ShreddingCompleteKey: SchemaKey = - SchemaKey("com.snowplowanalytics.snowplow.storage.rdbloader", "shredding_complete", "jsonschema", SchemaVer.Full(1,0,0)) + SchemaKey("com.snowplowanalytics.snowplow.storage", "shredding_complete", "jsonschema", SchemaVer.Full(1,0,0)) /** Data format for shredded data */ sealed trait Format extends Product with Serializable diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/S3.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/S3.scala index e6c7497c0..0921be524 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/S3.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/S3.scala @@ -33,7 +33,7 @@ object S3 { object Folder extends tag.Tagger[S3FolderTag] { def parse(s: String): Either[String, Folder] = s match { - case _ if !correctlyPrefixed(s) => "Bucket name must start with s3:// prefix".asLeft + case _ if !correctlyPrefixed(s) => s"Bucket name $s doesn't start with s3:// s3a:// or s3n:// prefix".asLeft case _ if s.length > 1024 => "Key length cannot be more than 1024 symbols".asLeft case _ => coerce(s).asRight } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/S3Spec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/S3Spec.scala new file mode 100644 index 000000000..c633d2481 --- /dev/null +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/S3Spec.scala @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ + +package com.snowplowanalytics.snowplow.rdbloader.common + +import org.specs2.mutable.Specification + +class S3Spec extends Specification { + "S3.Folder.parse()" should { + "support s3:// prefix" >> { + val folder = "s3://foo/" + S3.Folder.parse(folder) must beRight + } + "support s3a:// prefix" >> { + val folder = "s3a://foo/" + S3.Folder.parse(folder) must beRight + } + "support s3n:// prefix" >> { + val folder = "s3n://foo/" + S3.Folder.parse(folder) must beRight + } + } +} \ No newline at end of file diff --git a/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala b/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala index e107047b4..2ed8335b6 100644 --- a/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala +++ b/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShredJob.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012-2019 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -42,17 +42,23 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} // AWS SDK import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException +import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder} +import com.amazonaws.services.sqs.model.SendMessageRequest +import com.amazonaws.{AmazonClientException, AmazonWebServiceRequest, ClientConfiguration} +import com.amazonaws.retry.RetryPolicy.RetryCondition +import com.amazonaws.retry.{PredefinedBackoffStrategies, RetryPolicy} +// Snowplow import com.snowplowanalytics.snowplow.analytics.scalasdk.{ Event, ParsingError } import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.Contexts import com.snowplowanalytics.snowplow.badrows.{ BadRow, Processor, Payload, Failure, FailureDetails } import com.snowplowanalytics.snowplow.eventsmanifest.{ EventsManifest, EventsManifestConfig } - -// Snowplow -import com.snowplowanalytics.iglu.core.SchemaVer -import com.snowplowanalytics.iglu.core.{ SchemaKey, SelfDescribingData } +import com.snowplowanalytics.iglu.core.{ SchemaKey, SchemaVer, SelfDescribingData } +import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.iglu.client.{ Client, ClientError } + import rdbloader.common._ +import rdbloader.common.LoaderMessage._ import rdbloader.generated.ProjectMetadata /** Helpers method for the shred job */ @@ -62,6 +68,10 @@ object ShredJob extends SparkJob { val processor = Processor(ProjectMetadata.name, ProjectMetadata.version) + final val SqsMaxRetries = 10 + final val SqsRetryBaseDelay = 1000 // milliseconds + final val SqsRetryMaxDelay = 20 * 1000 // milliseconds + val DuplicateSchema = SchemaKey("com.snowplowanalytics.snowplow", "duplicate", "jsonschema", SchemaVer.Full(1,0,0)) val AtomicSchema = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)) @@ -137,6 +147,7 @@ object ShredJob extends SparkJob { Class.forName("scala.math.Ordering$Reverse"), classOf[org.apache.spark.sql.catalyst.InternalRow], Class.forName("com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anon$1"), + Class.forName("com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anon$2"), classOf[org.apache.spark.sql.execution.datasources.WriteTaskResult], classOf[org.apache.spark.sql.execution.datasources.ExecutedWriteSummary], classOf[org.apache.spark.sql.execution.datasources.BasicWriteTaskStats] @@ -149,11 +160,23 @@ object ShredJob extends SparkJob { .registerKryoClasses(classesToRegister) def run(spark: SparkSession, args: Array[String]): Unit = { + val jobStartedTimestamp = Instant.now() + + val semVer = Semver.decodeSemver(ProjectMetadata.version) match { + case Right(vers) => vers + case Left(failure) => throw new RuntimeException(failure) + } + // Job configuration val shredConfig = ShredJobConfig .loadConfigFrom(args) .valueOr(e => throw FatalEtlError(e)) + val shreddedFolder = S3.Folder.parse(shredConfig.outFolder) match { + case Right(folder) => folder + case Left(failure) => throw new RuntimeException(failure) + } + val job = new ShredJob(spark, shredConfig) val atomicLengths = singleton.IgluSingleton.get(shredConfig.igluConfig).resolver.lookupSchema(AtomicSchema) match { // TODO: retry @@ -163,6 +186,9 @@ object ShredJob extends SparkJob { throw new RuntimeException(s"RDB Shredder could not fetch ${AtomicSchema.toSchemaUri} schema at initialization. ${(error: ClientError).show}") } + + val sqsClient: AmazonSQS = createSqsClient() + val eventsManifest: Option[EventsManifestConfig] = shredConfig.duplicateStorageConfig.map { json => val config = EventsManifestConfig .parseJson[Id](singleton.IgluSingleton.get(shredConfig.igluConfig), json) @@ -171,7 +197,35 @@ object ShredJob extends SparkJob { config } - job.run(atomicLengths, eventsManifest) + val shreddedTypes = job.run(atomicLengths, eventsManifest).toList + val jobCompletedTimestamp = Instant.now() + val timestamps = Timestamps( + jobStartedTimestamp, + jobCompletedTimestamp, + None, // TODO: read from events + None // TODO: read from events + ) + val processor = LoaderMessage.Processor(ProjectMetadata.shredderName, semVer) + val shreddingComplete = ShreddingComplete( + shreddedFolder, + shreddedTypes, + timestamps, + processor + ) + + val sqsMessage: SendMessageRequest = + new SendMessageRequest() + .withQueueUrl(shredConfig.storage.messageQueue) + .withMessageBody(shreddingComplete.selfDescribingData.asString) + + sqsMessage.setMessageGroupId("shredding") + + Either.catchNonFatal(sqsClient.sendMessage(sqsMessage)) match { + case Left(e) => + throw new RuntimeException(s"RDB Shredder could not send shredded types [$shreddedTypes] to SQS with error [${e.getMessage}]") + case _ => + () + } } /** @@ -252,7 +306,30 @@ object ShredJob extends SparkJob { } case _ => Right(true) } + } + /** Create SQS client with built-in retry mechanism (jitter) */ + private def createSqsClient(): AmazonSQS = { + AmazonSQSClientBuilder + .standard() + .withClientConfiguration( + new ClientConfiguration().withRetryPolicy( + new RetryPolicy( + new RetryCondition { + override def shouldRetry( + originalRequest: AmazonWebServiceRequest, + exception: AmazonClientException, + retriesAttempted: Int + ): Boolean = + retriesAttempted < SqsMaxRetries + }, + new PredefinedBackoffStrategies.FullJitterBackoffStrategy(SqsRetryBaseDelay, SqsRetryMaxDelay), + SqsMaxRetries, + true + ) + ) + ) + .build } } @@ -267,8 +344,19 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig) import singleton._ // Accumulator to track shredded types - val shreddedTypes = new StringSetAccumulator - sc.register(shreddedTypes) + val shreddedTypesSqs = new ShreddedTypesAccumulator + sc.register(shreddedTypesSqs) + + /** Save set of shredded types into accumulator, for master to send to SQS */ + def recordShreddedType(inventory: Set[SchemaKey]): Unit = { + val withFormat: Set[ShreddedType] = + inventory + .map { schemaKey => + if (isTabular(schemaKey)) ShreddedType(schemaKey, Format.TSV) + else ShreddedType(schemaKey, Format.JSON) + } + shreddedTypesSqs.add(withFormat) + } /** Check if `shredType` should be transformed into TSV */ def isTabular(shredType: SchemaKey): Boolean = @@ -288,7 +376,7 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig) * - writing out JSON contexts as well as properly-formed and malformed events */ def run(atomicLengths: Map[String, Int], - eventsManifest: Option[EventsManifestConfig]): Unit = { + eventsManifest: Option[EventsManifestConfig]): Set[ShreddedType] = { import ShredJob._ def shred(event: Event): Either[BadRow, FinalRow] = @@ -340,6 +428,7 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig) .groupBy { s => (s.event_id, s.event_fingerprint.getOrElse(UUID.randomUUID().toString)) } .flatMap { case (_, s) => val first = s.minBy(_.etl_tstamp) + recordShreddedType(first.inventory.map(_.schemaKey)) dedupeCrossBatch(first, batchTimestamp, DuplicateStorageSingleton.get(eventsManifest)) match { case Right(unique) if unique => Some(Right(first)) case Right(_) => None @@ -408,5 +497,8 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig) .write .mode(SaveMode.Overwrite) .text(shredConfig.badFolder) + + val shreddedAtomic = ShreddedType(AtomicSchema, Format.TSV) + (shreddedTypesSqs.value + shreddedAtomic).toSet } } diff --git a/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShreddedTypesAccumulator.scala b/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShreddedTypesAccumulator.scala new file mode 100644 index 000000000..d183a6b1d --- /dev/null +++ b/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/ShreddedTypesAccumulator.scala @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.spark + +import org.apache.spark.util.AccumulatorV2 + +import scala.collection.mutable + +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.ShreddedType + +import ShreddedTypesAccumulator._ + +class ShreddedTypesAccumulator extends AccumulatorV2[KeyAccum, KeyAccum] { + + private val accum = mutable.Set.empty[ShreddedType] + + def merge(other: AccumulatorV2[KeyAccum, KeyAccum]): Unit = other match { + case o: ShreddedTypesAccumulator => accum ++= o.accum + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + def isZero: Boolean = accum.isEmpty + + def copy(): AccumulatorV2[KeyAccum, KeyAccum] = { + val newAcc = new ShreddedTypesAccumulator + accum.synchronized { + newAcc.accum ++= accum + } + newAcc + } + + def value = accum + + def add(keys: KeyAccum): Unit = { + accum ++= keys + } + + def add(keys: Set[ShreddedType]): Unit = { + val mutableSet = mutable.Set(keys.toList: _*) + add(mutableSet) + } + + def reset(): Unit = { + accum.clear() + } +} + +object ShreddedTypesAccumulator { + type KeyAccum = mutable.Set[ShreddedType] +} diff --git a/modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala b/modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala index 731dc84fd..e6ab50e90 100644 --- a/modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala +++ b/modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/ShredJobSpec.scala @@ -14,14 +14,15 @@ */ package com.snowplowanalytics.snowplow.storage.spark -import java.io.{BufferedWriter, FileWriter, File, IOException} +import java.io.{FileWriter, IOException, File, BufferedWriter} import org.apache.commons.io.filefilter.IOFileFilter import scala.collection.JavaConverters._ - import scala.io.Source import scala.util.Random +import com.snowplowanalytics.iglu.core.SchemaCriterion + // Commons import org.apache.commons.codec.binary.Base64 import org.apache.commons.io.FileUtils @@ -46,6 +47,7 @@ import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.eventsmanifest.EventsManifestConfig import com.snowplowanalytics.snowplow.rdbloader.generated.ProjectMetadata +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.ShreddedType // Specs2 import org.specs2.matcher.Matcher @@ -209,9 +211,10 @@ object ShredJobSpec { parseCirce(s).map(setter).map(_.noSpaces).getOrElse(s) } - private def storageConfig(tsv: Boolean) = { + private def storageConfig(tsv: Boolean, jsonSchemas: List[SchemaCriterion]) = { val encoder = new Base64(true) val format = if (tsv) "TSV" else "JSON" + val jsonCriterions = jsonSchemas.map(x => s""""${x.asString}"""").mkString(",") val configPlain = s"""|{ |name = "Acme Redshift" |id = "123e4567-e89b-12d3-a456-426655440000" @@ -234,7 +237,7 @@ object ShredJobSpec { | "sshTunnel": null |}, |monitoring = {"snowplow": null, "sentry": null}, - |formats = { "default": "$format", "json": [ ], "tsv": [ ], "skip": [ ] }, + |formats = { "default": "$format", "json": [$jsonCriterions], "tsv": [ ], "skip": [ ] }, |steps = [] |}""".stripMargin new String(encoder.encode(configPlain.getBytes())) @@ -321,14 +324,14 @@ trait ShredJobSpec extends SparkSpec { * Run the shred job with the specified lines as input. * @param lines input lines */ - def runShredJob(lines: Lines, crossBatchDedupe: Boolean = false, tsv: Boolean = false): Unit = { + def runShredJob(lines: Lines, crossBatchDedupe: Boolean = false, tsv: Boolean = false, jsonSchemas: List[SchemaCriterion] = Nil): Set[ShreddedType] = { val input = mkTmpFile("input", createParents = true, containing = lines.some) val config = Array( "--input-folder", input.toString(), "--output-folder", dirs.output.toString(), "--bad-folder", dirs.badRows.toString(), "--iglu-config", igluConfigWithLocal, - "--config", storageConfig(tsv) + "--config", storageConfig(tsv, jsonSchemas) ) val (dedupeConfigCli, dedupeConfig) = if (crossBatchDedupe) { @@ -345,8 +348,9 @@ trait ShredJobSpec extends SparkSpec { .fold(e => throw new RuntimeException(s"Cannot parse test configuration: $e"), c => c) val job = new ShredJob(spark, shredJobConfig) - job.run(Map.empty, dedupeConfig) + val result = job.run(Map.empty, dedupeConfig) deleteRecursively(input) + result } override def afterAll(): Unit = { diff --git a/modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/good/AccumulatorSpec.scala b/modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/good/AccumulatorSpec.scala new file mode 100644 index 000000000..385ca3be2 --- /dev/null +++ b/modules/shredder/src/test/scala/com.snowplowanalytics.snowplow.storage.spark/good/AccumulatorSpec.scala @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and + * limitations there under. + */ +package com.snowplowanalytics.snowplow.storage.spark +package good + +import org.specs2.mutable.Specification +import com.snowplowanalytics.iglu.core.{SchemaVer, SchemaKey, SchemaCriterion} + +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.ShreddedType +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.Format + +class AccumulatorSpec extends Specification with ShredJobSpec { + import ShredJobSpec._ + + val inputEvent = Lines( + """snowplowweb web 2014-06-01 14:04:11.639 2014-05-29 18:16:35.000 2014-05-29 18:16:35.967 unstruct 2b1b25a4-c0df-4859-8201-cf21492ad61b 114221 clojure js-2.0.0-M2 clj-0.6.0-tom-0.0.4 hadoop-0.5.0-common-0.4.0 68.42.204.218 1242058182 58df65c46e1ac937 11 437ad25b-2006-455e-b5d8-d664b74df8f3 US MI Holland 49423 42.742294 -86.0661 http://snowplowanalytics.com/blog/ https://www.google.com/ http snowplowanalytics.com 80 /blog/ https www.google.com 80 / search Google {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:org.schema/WebPage/jsonschema/1-0-0","data":{"datePublished":"2014-07-23T00:00:00Z","author":"Jonathan Almeida","inLanguage":"en-US","genre":"blog","breadcrumb":["blog","releases"],"keywords":["snowplow","analytics","java","jvm","tracker"]}}]} {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/link_click/jsonschema/1-0-0","data":{"targetUrl":"http://snowplowanalytics.com/blog/page2","elementClasses":["next"]}}} Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.114 Safari/537.36 Chrome Chrome Browser WEBKIT en-US 1 1 1 0 1 0 0 0 1 1 24 1241 806 Mac OS Mac OS Apple Inc. America/New_York Computer 0 1440 900 UTF-8 """ + ) + + override def appName = "accumulator" + sequential + "A shredding job" should { + "return the list of shredded types with their format without --target" in { + val expected = Set( + ShreddedType(SchemaKey("org.schema", "WebPage" , "jsonschema" , SchemaVer.Full(1,0,0)), Format.JSON), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "link_click", "jsonschema", SchemaVer.Full(1,0,0)), Format.JSON) + ) + val actual = runShredJob(inputEvent) + actual ==== expected + } + + "return the list of shredded types with their format with --target and no blacklist" in { + val expected = Set( + ShreddedType(SchemaKey("org.schema", "WebPage" , "jsonschema" , SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "link_click", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV) + ) + val actual = runShredJob(inputEvent, false, true) + actual ==== expected + } + + "return the list of shredded types with their format with --target and schema in non-empty blacklist" in { + val linkClickSchema = SchemaCriterion("com.snowplowanalytics.snowplow", "link_click", "jsonschema", 1) + val expected = Set( + ShreddedType(SchemaKey("org.schema", "WebPage" , "jsonschema" , SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "link_click", "jsonschema", SchemaVer.Full(1,0,0)), Format.JSON) + ) + val actual = runShredJob(inputEvent, false, true, List(linkClickSchema)) + actual ==== expected + } + + "return the list of shredded types with their format with --target and schema not in non-empty blacklist" in { + val randomSchema = SchemaCriterion("foo", "bar", "jsonschema", 1) + val expected = Set( + ShreddedType(SchemaKey("org.schema", "WebPage" , "jsonschema" , SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV), + ShreddedType(SchemaKey("com.snowplowanalytics.snowplow", "link_click", "jsonschema", SchemaVer.Full(1,0,0)), Format.TSV) + ) + val actual = runShredJob(inputEvent, false, true, List(randomSchema)) + actual ==== expected + } + } +} \ No newline at end of file diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index c1dd21d2f..e2f1956b1 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -96,6 +96,12 @@ object BuildSettings { case x if x.startsWith("META-INF") => MergeStrategy.discard case x if x.endsWith(".html") => MergeStrategy.discard case x if x.endsWith("package-info.class") => MergeStrategy.first + case x if x.endsWith("customization.config") => MergeStrategy.first + case x if x.endsWith("examples-1.json") => MergeStrategy.first + case x if x.endsWith("paginators-1.json") => MergeStrategy.first + case x if x.endsWith("service-2.json") => MergeStrategy.first + case x if x.endsWith("waiters-2.json") => MergeStrategy.first + case x if x.endsWith("mime.types") => MergeStrategy.first case x if x.endsWith("module-info.class") => MergeStrategy.discard case PathList("com", "google", "common", _) => MergeStrategy.first case PathList("org", "apache", "spark", "unused", _) => MergeStrategy.first diff --git a/project/Dependencies.scala b/project/Dependencies.scala index bc96740c1..ab7a19a80 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -96,6 +96,7 @@ object Dependencies { // Java (Shredder) val dynamodb = "com.amazonaws" % "aws-java-sdk-dynamodb" % V.aws + val sqs = "com.amazonaws" % "aws-java-sdk-sqs" % V.aws // Scala (test only) val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test