From a3ddcc2a355cee79f31fd781aa0db3c98f4e6c98 Mon Sep 17 00:00:00 2001 From: Anton Parkhomenko Date: Wed, 20 Jan 2021 18:44:17 +0300 Subject: [PATCH] Common: get rid of atomic-events folder (close #183) --- .../snowplow/rdbloader/common/Common.scala | 11 ++- .../rdbloader/common/LoaderMessage.scala | 4 +- .../snowplow/rdbloader/common/S3.scala | 14 ---- .../snowplow/rdbloader/common/package.scala | 4 -- .../rdbloader/discovery/DataDiscovery.scala | 4 +- .../rdbloader/discovery/ShreddedType.scala | 67 +++++++++++-------- .../loading/RedshiftStatements.scala | 5 +- .../snowplow/rdbloader/S3Spec.scala | 44 ------------ .../discovery/ShreddedTypeSpec.scala | 12 ++-- .../rdbloader/loading/CommonSpec.scala | 12 ++-- .../loading/RedshiftLoaderSpec.scala | 12 ++-- .../snowplow/shredder/ShredJob.scala | 22 +++--- .../shredder/spark/Serialization.scala | 7 +- .../snowplow/shredder/spark/Sink.scala | 62 +++++++---------- .../shredder/transformation/EventUtils.scala | 15 ++--- .../shredder/transformation/FinalRow.scala | 41 ------------ .../shredder/transformation/Shredded.scala | 53 +++++++++++---- .../snowplow/shredder/ShredJobSpec.scala | 2 + .../bad/InvalidEnrichedEventsSpec.scala | 7 +- .../shredder/bad/InvalidJsonsSpec.scala | 7 +- .../shredder/bad/MissingJsonSchemaSpec.scala | 7 +- .../shredder/bad/NotEnrichedEventsSpec.scala | 7 +- .../bad/SchemaValidationFailedSpec.scala | 7 +- .../good/CrossBatchDeduplicationSpec.scala | 8 +-- .../shredder/good/DerivedContextsSpec.scala | 7 +- .../shredder/good/EmptySchemaSpec.scala | 10 +-- .../good/EventDeduplicationSpec.scala | 10 +-- .../good/ForwardCompatibleContextSpec.scala | 6 +- .../shredder/good/LinkClickEventSpec.scala | 6 +- .../shredder/good/MultipleJsonsSpec.scala | 4 +- .../good/WebsitePageContextSpec.scala | 6 +- .../shredder/good/tabular/NewlineSpec.scala | 6 +- .../good/tabular/TabularOutputSpec.scala | 8 +-- 33 files changed, 202 insertions(+), 295 deletions(-) delete mode 100644 modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/S3Spec.scala delete mode 100644 modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/transformation/FinalRow.scala diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Common.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Common.scala index 610ced750..0554a67ed 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Common.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/Common.scala @@ -12,17 +12,26 @@ */ package com.snowplowanalytics.snowplow.rdbloader.common -import com.snowplowanalytics.iglu.core.SchemaKey +import com.snowplowanalytics.iglu.core.{SchemaVer, SchemaKey} import com.snowplowanalytics.iglu.client.resolver.registries.Registry import com.snowplowanalytics.snowplow.rdbloader.common.Config.Formats +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{ShreddedType, Format} /** * Various common utility functions */ object Common { + val AtomicSchema: SchemaKey = + SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)) + val AtomicType = ShreddedType(AtomicSchema, Format.TSV) + val AtomicPath: String = entityPath(AtomicType) + + def entityPath(entity: ShreddedType) = + s"vendor=${entity.schemaKey.vendor}/name=${entity.schemaKey.name}/format=${entity.format.path}/model=${entity.schemaKey.version.model}" + /** * Remove all occurrences of access key id and secret access key from message * Helps to avoid publishing credentials on insecure channels diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/LoaderMessage.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/LoaderMessage.scala index 0f174ffc4..d8de26579 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/LoaderMessage.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/LoaderMessage.scala @@ -44,7 +44,9 @@ object LoaderMessage { SchemaKey("com.snowplowanalytics.snowplow.storage.rdbloader", "shredding_complete", "jsonschema", SchemaVer.Full(1,0,0)) /** Data format for shredded data */ - sealed trait Format extends Product with Serializable + sealed trait Format extends Product with Serializable { + def path: String = this.toString.toLowerCase + } object Format { final case object TSV extends Format final case object JSON extends Format diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/S3.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/S3.scala index ec4671ec7..0dfa72b5a 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/S3.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/S3.scala @@ -87,20 +87,6 @@ object S3 { case class BlobObject(key: Key, size: Long) - /** - * Extract `s3://path/run=YYYY-MM-dd-HH-mm-ss/atomic-events/` part from - * `s3://path/run=YYYY-MM-dd-HH-mm-ss/atomic-events/somefile` - * - * @param s string probably containing run id and atomic events subpath - * @return string refined as folder - */ - def getAtomicPath(s: Key): Option[Folder] = - s match { - case AtomicSubpathPattern(prefix, subpath, _) => - Some(Folder.coerce(prefix + "/" + subpath)) - case _ => None - } - /** * Refined type for AWS S3 key, allowing only valid S3 paths * (with `s3://` prefix and without trailing shash) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/package.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/package.scala index a42b91172..b4a542b0e 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/package.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/package.scala @@ -30,10 +30,6 @@ import com.snowplowanalytics.snowplow.scalatracker.UUIDProvider package object common { - /** * Subpath to check `atomic-events` directory presence */ - val AtomicSubpathPattern = "(.*)/(run=[0-9]{4}-[0-1][0-9]-[0-3][0-9]-[0-2][0-9]-[0-6][0-9]-[0-6][0-9]/atomic-events)/(.*)".r - // year month day hour minute second - implicit val catsClockIdInstance: Clock[Id] = new Clock[Id] { override def realTime(unit: TimeUnit): Id[Long] = unit.convert(System.currentTimeMillis(), MILLISECONDS) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala index d850bcd95..678756478 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala @@ -17,7 +17,7 @@ import cats.implicits._ import com.snowplowanalytics.snowplow.rdbloader.common.Config.Compression import com.snowplowanalytics.snowplow.rdbloader.{DiscoveryStep, DiscoveryStream, LoaderError, LoaderAction, State} -import com.snowplowanalytics.snowplow.rdbloader.common.{S3, Message, Config, LoaderMessage} +import com.snowplowanalytics.snowplow.rdbloader.common.{Config, Message, LoaderMessage, S3, Common} import com.snowplowanalytics.snowplow.rdbloader.dsl.{Logging, AWS, Cache} /** @@ -34,7 +34,7 @@ case class DataDiscovery(base: S3.Folder, shreddedTypes: List[ShreddedType], com /** `atomic-events` directory full path */ def atomicEvents: S3.Folder = - S3.Folder.append(base, "atomic-events") + S3.Folder.append(base, Common.AtomicPath) def show: String = { val shreddedTypesList = shreddedTypes.map(x => s" * ${x.show}").mkString("\n") diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala index ecc8216e3..fd6f9af32 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala @@ -17,10 +17,10 @@ import scala.util.matching.Regex import cats.{Apply, Monad} import cats.implicits._ -import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaVer, SchemaKey} +import com.snowplowanalytics.iglu.core.SchemaCriterion import com.snowplowanalytics.snowplow.rdbloader.DiscoveryAction -import com.snowplowanalytics.snowplow.rdbloader.common.{S3, LoaderMessage, Semver} +import com.snowplowanalytics.snowplow.rdbloader.common.{S3, LoaderMessage, Semver, Common} import com.snowplowanalytics.snowplow.rdbloader.dsl.{AWS, Cache} import com.snowplowanalytics.snowplow.rdbloader.common.Common.toSnakeCase @@ -31,6 +31,15 @@ sealed trait ShreddedType { def getLoadPath: String /** Human-readable form */ def show: String + + /** Check if this type is special atomic type */ + def isAtomic = this match { + case ShreddedType.Tabular(ShreddedType.Info(_, vendor, name, model, _)) => + vendor == Common.AtomicSchema.vendor && name == Common.AtomicSchema.name && model == Common.AtomicSchema.version.model + case _ => + false + } + } /** @@ -46,7 +55,7 @@ object ShreddedType { */ final case class Json(info: Info, jsonPaths: S3.Key) extends ShreddedType { def getLoadPath: String = - s"${info.base}shredded-types/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}-" + s"${info.base}vendor=${info.vendor}/name=${info.name}/format=json/model=${info.model}" def show: String = s"${info.toCriterion.asString} ($jsonPaths)" } @@ -59,7 +68,7 @@ object ShreddedType { */ final case class Tabular(info: Info) extends ShreddedType { def getLoadPath: String = - s"${info.base}shredded-tsv/vendor=${info.vendor}/name=${info.name}/format=jsonschema/version=${info.model}" + s"${info.base}vendor=${info.vendor}/name=${info.name}/format=tsv/model=${info.model}" def show: String = s"${info.toCriterion.asString} TSV" } @@ -69,7 +78,7 @@ object ShreddedType { * It cannot be counted as "final" shredded type, * as it's not proven to have JSONPaths file * - * @param base s3 path run folder (without `shredded-types` suffix) + * @param base s3 path run folder * @param vendor self-describing type's vendor * @param name self-describing type's name * @param model self-describing type's SchemaVer model @@ -113,24 +122,22 @@ object ShreddedType { /** Regex to extract `SchemaKey` from `shredded/good` */ val ShreddedSubpathPattern: Regex = - ("""shredded\-types""" + - """/vendor=(?[a-zA-Z0-9-_.]+)""" + + ("""vendor=(?[a-zA-Z0-9-_.]+)""" + """/name=(?[a-zA-Z0-9-_]+)""" + - """/format=(?[a-zA-Z0-9-_]+)""" + - """/version=(?[1-9][0-9]*(?:-(?:0|[1-9][0-9]*)){2})$""").r + """/format=json""" + + """/model=(?[1-9][0-9]*)$""").r /** Regex to extract `SchemaKey` from `shredded/good` */ val ShreddedSubpathPatternTabular: Regex = - ("""shredded\-tsv""" + - """/vendor=(?[a-zA-Z0-9-_.]+)""" + - """/name=(?[a-zA-Z0-9-_]+)""" + - """/format=(?[a-zA-Z0-9-_]+)""" + - """/version=(?[1-9][0-9]*)$""").r + ("""vendor=(?[a-zA-Z0-9-_.]+)""" + + """/name=(?[a-zA-Z0-9-_]+)""" + + """/format=tsv""" + + """/model=(?[1-9][0-9]*)$""").r /** - * "shredded-types" + vendor + name + format + version + filename + * vendor + name + format + version + filename */ - private val MinShreddedPathLengthModern = 6 + private val MinShreddedPathLengthModern = 5 /** * Check where JSONPaths file for particular shredded type exists: @@ -212,13 +219,13 @@ object ShreddedType { */ def transformPath(key: S3.Key, shredJob: Semver): Either[DiscoveryFailure, (Boolean, Info)] = { val (bucket, path) = S3.splitS3Key(key) - val (subpath, shredpath) = splitFilpath(path) + val (subpath, shredpath) = splitFilepath(path) extractSchemaKey(shredpath) match { - case Some(Extracted.Legacy(SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)))) => + case Some(Extracted.Legacy(vendor, name, model)) => val prefix = S3.Folder.coerce("s3://" + bucket + "/" + subpath) val result = Info(prefix, vendor, name, model, shredJob) (false, result).asRight - case Some(Extracted.Tabular(vendor, name, _, model)) => + case Some(Extracted.Tabular(vendor, name, model)) => val prefix = S3.Folder.coerce("s3://" + bucket + "/" + subpath) val result = Info(prefix, vendor, name, model, shredJob) (true, result).asRight @@ -229,8 +236,8 @@ object ShreddedType { sealed trait Extracted object Extracted { - final case class Legacy(key: SchemaKey) extends Extracted - final case class Tabular(vendor: String, name: String, format: String, model: Int) extends Extracted + final case class Legacy(vendor: String, name: String, model: Int) extends Extracted + final case class Tabular(vendor: String, name: String, model: Int) extends Extracted } /** @@ -244,15 +251,19 @@ object ShreddedType { */ def extractSchemaKey(subpath: String): Option[Extracted] = subpath match { - case ShreddedSubpathPattern(vendor, name, format, version) => - val uri = s"iglu:$vendor/$name/$format/$version" - SchemaKey.fromUri(uri).toOption.map(Extracted.Legacy) - case ShreddedSubpathPatternTabular(vendor, name, format, model) => + case ShreddedSubpathPattern(vendor, name, model) => + scala.util.Try(model.toInt).toOption match { + case Some(m) => Extracted.Legacy(vendor, name, m).some + case None => None + } + case ShreddedSubpathPatternTabular(vendor, name, model) => scala.util.Try(model.toInt).toOption match { - case Some(m) => Extracted.Tabular(vendor, name, format, m).some + case Some(m) => Extracted.Tabular(vendor, name, m).some case None => None } - case _ => None + case _ => + println(subpath) + None } /** @@ -266,7 +277,7 @@ object ShreddedType { * @param path S3 key without bucket name * @return pair of subpath and shredpath */ - private def splitFilpath(path: String): (String, String) = + private def splitFilepath(path: String): (String, String) = path.split("/").reverse.splitAt(MinShreddedPathLengthModern) match { case (reverseSchema, reversePath) => (reversePath.reverse.mkString("/"), reverseSchema.tail.reverse.mkString("/")) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftStatements.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftStatements.scala index 03bd2a3b8..baafe35db 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftStatements.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftStatements.scala @@ -57,7 +57,10 @@ object RedshiftStatements { * More than one `RedshiftLoadStatements` must be grouped with others using `buildQueue` */ private[loading] def getStatements(config: Config[Redshift], discovery: DataDiscovery): RedshiftStatements = { - val shreddedStatements = discovery.shreddedTypes.map(transformShreddedType(config, discovery.compression)) + val shreddedStatements = discovery + .shreddedTypes + .filterNot(_.isAtomic) + .map(transformShreddedType(config, discovery.compression)) val transitCopy = config.steps.contains(Step.TransitCopy) val compressionFormat = getCompressionFormat(discovery.compression) val atomic = buildEventsCopy(config, discovery.atomicEvents, transitCopy, compressionFormat) diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/S3Spec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/S3Spec.scala deleted file mode 100644 index 94bc5a79d..000000000 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/S3Spec.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2012-2019 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.rdbloader - -import com.snowplowanalytics.snowplow.rdbloader.common.S3 - -import org.joda.time.DateTime -import org.joda.time.format.DateTimeFormat -import org.scalacheck.Gen -import org.specs2.{Specification, ScalaCheck} - - -class S3Spec extends Specification with ScalaCheck { def is = s2""" - Always extract atomic events path from valid S3 key $e1 - """ - - def e1 = { - - val twoThousandTens = - Gen.chooseNum(1348368300000L, 1569206700000L) - - prop { (timestamp: Long, file: String) => - val dateTime = new DateTime(timestamp) - val format = DateTimeFormat.forPattern("YYYY-MM-dd-HH-mm-ss") - val id = format.print(dateTime) - val runId = s"s3://bucket/run=$id/atomic-events/$file" - val s3Key = S3.Key.coerce(runId) - S3.getAtomicPath(s3Key) must beSome.like { - case folder => folder must endWith("atomic-events/") - } - }.setGen1(twoThousandTens).setGen2(Gen.alphaStr) - } -} - diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedTypeSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedTypeSpec.scala index 236263017..8ddb2a705 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedTypeSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedTypeSpec.scala @@ -47,15 +47,15 @@ class ShreddedTypeSpec extends Specification with ScalaCheck { def is = s2""" } def e3 = { - val path = "vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00000-00001" - val key = S3.Key.coerce(s"s3://rdb-test/shredded-types/$path") + val path = "vendor=com.snowplowanalytics.snowplow/name=submit_form/format=json/model=1/part-00000-00001" + val key = S3.Key.coerce(s"s3://rdb-test/$path") val result = ShreddedType.transformPath(key, Semver(0,13,0)) val expected = (false, Info(S3.Folder.coerce("s3://rdb-test"), "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,13,0))) result must beRight(expected) } def e4 = { - val key = S3.Key.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/shredded-types/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1-0-0/part-00000-00001") + val key = S3.Key.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=json/model=1/part-00000-00001") val expectedPrefix = S3.Folder.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/") val expected = (false, Info(expectedPrefix, "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,13,0))) @@ -65,12 +65,12 @@ class ShreddedTypeSpec extends Specification with ScalaCheck { def is = s2""" } def e5 = { - val input = "shredded-tsv/vendor=com.snowplow/name=event/format=jsonschema/version=1" - ShreddedType.extractSchemaKey(input) must beSome(Extracted.Tabular("com.snowplow", "event", "jsonschema", 1)) + val input = "vendor=com.snowplow/name=event/format=tsv/model=1" + ShreddedType.extractSchemaKey(input) must beSome(Extracted.Tabular("com.snowplow", "event", 1)) } def e6 = { - val key = S3.Key.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/shredded-tsv/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=jsonschema/version=1/part-00000-00001") + val key = S3.Key.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/vendor=com.snowplowanalytics.snowplow/name=submit_form/format=tsv/model=1/part-00000-00001") val expectedPrefix = S3.Folder.coerce("s3://snowplow-shredded/good/run=2017-06-14-12-07-11/") val expected = (true, Info(expectedPrefix, "com.snowplowanalytics.snowplow", "submit_form", 1, Semver(0,16,0))) diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/CommonSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/CommonSpec.scala index 32d6be5ed..ce39a9e68 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/CommonSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/CommonSpec.scala @@ -39,8 +39,8 @@ class CommonSpec extends Specification { val expected = List( "BEGIN", - "COPY atomic.events FROM 's3://shredded/base/atomic-events/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", - "COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/shredded-types/vendor=com.acme/name=json-context/format=jsonschema/version=1-' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.events FROM 's3://shredded/base/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", "COMMIT" ) @@ -59,8 +59,8 @@ class CommonSpec extends Specification { val expected = List( "BEGIN", - "COPY atomic.events FROM 's3://shredded/base/atomic-events/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", - "COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/shredded-types/vendor=com.acme/name=json-context/format=jsonschema/version=1-' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.events FROM 's3://shredded/base/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", "ACK", "COMMIT" ) @@ -80,8 +80,8 @@ class CommonSpec extends Specification { val expected = List( "BEGIN", - "COPY atomic.events FROM 's3://shredded/base/atomic-events/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", - "COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/shredded-types/vendor=com.acme/name=json-context/format=jsonschema/version=1-' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.events FROM 's3://shredded/base/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.com_acme_json_context_1 FROM 's3://shredded/base/vendor=com.acme/name=json-context/format=json/model=1' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/com.acme/json_context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", ) val result = Common.load[Pure](SpecHelpers.validCliConfig, message).runS diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftLoaderSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftLoaderSpec.scala index 6b891138a..864bb700d 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftLoaderSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/RedshiftLoaderSpec.scala @@ -87,23 +87,27 @@ class RedshiftLoaderSpec extends Specification { val shreddedTypes = List( ShreddedType.Json(ShreddedType.Info(S3.Folder.coerce("s3://my-bucket/my-path"), "com.acme", "event", 2, Semver(1,5,0)), S3.Key.coerce("s3://assets/event_1.json")), - ShreddedType.Json(ShreddedType.Info(S3.Folder.coerce("s3://my-bucket/my-path"), "com.acme", "context", 2, Semver(1,5,0)), S3.Key.coerce("s3://assets/context_1.json")) + ShreddedType.Json(ShreddedType.Info(S3.Folder.coerce("s3://my-bucket/my-path"), "com.acme", "context", 2, Semver(1,5,0)), S3.Key.coerce("s3://assets/context_2.json")), + ShreddedType.Tabular(ShreddedType.Info(S3.Folder.coerce("s3://my-bucket/my-path"), "com.acme", "context", 3, Semver(1,5,0))), ) val discovery = DataDiscovery(S3.Folder.coerce("s3://my-bucket/my-path"), shreddedTypes, Compression.Gzip) val (state, result) = RedshiftLoader.run[Pure](SpecHelpers.validConfig.copy(steps = Set(Step.Vacuum, Step.Analyze)), discovery).flatMap(identity).run val expected = List( - "COPY atomic.events FROM 's3://my-bucket/my-path/atomic-events/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", - "COPY atomic.com_acme_event_2 FROM 's3://my-bucket/my-path/shredded-types/vendor=com.acme/name=event/format=jsonschema/version=2-' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/event_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", - "COPY atomic.com_acme_context_2 FROM 's3://my-bucket/my-path/shredded-types/vendor=com.acme/name=context/format=jsonschema/version=2-' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/context_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.events FROM 's3://my-bucket/my-path/vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1/' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' EMPTYASNULL FILLRECORD TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.com_acme_event_2 FROM 's3://my-bucket/my-path/vendor=com.acme/name=event/format=json/model=2' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/event_1.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.com_acme_context_2 FROM 's3://my-bucket/my-path/vendor=com.acme/name=context/format=json/model=2' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' JSON AS 's3://assets/context_2.json' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", + "COPY atomic.com_acme_context_3 FROM 's3://my-bucket/my-path/vendor=com.acme/name=context/format=tsv/model=3' CREDENTIALS 'aws_iam_role=arn:aws:iam::123456789876:role/RedshiftLoadRole' REGION AS 'us-east-1' MAXERROR 1 TIMEFORMAT 'auto' DELIMITER ' ' TRUNCATECOLUMNS ACCEPTINVCHARS GZIP", "VACUUM SORT ONLY atomic.events", "VACUUM SORT ONLY atomic.com_acme_event_2", "VACUUM SORT ONLY atomic.com_acme_context_2", + "VACUUM SORT ONLY atomic.com_acme_context_3", "BEGIN", "ANALYZE atomic.events", "ANALYZE atomic.com_acme_event_2", "ANALYZE atomic.com_acme_context_2", + "ANALYZE atomic.com_acme_context_3", "COMMIT" ) diff --git a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/ShredJob.scala b/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/ShredJob.scala index b4b370aed..0bb6131b6 100644 --- a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/ShredJob.scala +++ b/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/ShredJob.scala @@ -33,7 +33,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage._ import com.snowplowanalytics.snowplow.rdbloader.common.S3.Folder import com.snowplowanalytics.snowplow.shredder.Discovery.MessageProcessor -import com.snowplowanalytics.snowplow.shredder.transformation.{FinalRow, EventUtils} +import com.snowplowanalytics.snowplow.shredder.transformation.{ EventUtils, Shredded } import com.snowplowanalytics.snowplow.shredder.spark.{singleton, Sink, ShreddedTypesAccumulator, TimestampsAccumulator} /** @@ -133,34 +133,30 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: CliConfig) exten timestampsAccumulator.add(event) val isSyntheticDupe = syntheticDupesBroadcasted.value.contains(event.event_id) val withDupeContext = if (isSyntheticDupe) Deduplication.withSynthetic(event) else event - FinalRow.shred(shredConfig.igluConfig, isTabular, atomicLengths)(withDupeContext) + Shredded.fromEvent(shredConfig.igluConfig, isTabular, atomicLengths)(withDupeContext) } }.cache() - val shreddedGood = shredded.flatMap(_.toOption) - - // Ready the events for database load - val events = shreddedGood.map(_.atomic) - // Update the shredded JSONs with the new deduplicated event IDs and stringify - val shreddedData = shreddedGood.flatMap(_.shredded) + val shreddedData = shredded.flatMap(_.getOrElse(Nil)) // Data that failed TSV transformation val shreddedBad = (common.flatMap(_.swap.toOption) ++ shredded.flatMap(_.swap.toOption)).map(bad => Row(bad.compact)) - // Write as strings to `atomic-events` directory - Sink.writeEvents(spark, shredder.compression, events, outFolder) - // Final output Sink.writeShredded(spark, shredder.compression, shredConfig.config.formats, shreddedData, outFolder) // Bad data Sink.writeBad(spark, shredder.compression, shreddedBad, badFolder) - val shreddedTypes = (shreddedTypesAccumulator.value).toList + val shreddedTypes = shreddedTypesAccumulator.value.toList val batchTimestamps = timestampsAccumulator.value val timestamps = Timestamps(jobStarted, Instant.now(), batchTimestamps.map(_.min), batchTimestamps.map(_.max)) - LoaderMessage.ShreddingComplete(outFolder, shreddedTypes, timestamps, shredder.compression, MessageProcessor) + + val isEmpty = batchTimestamps.isEmpty || shreddedTypes.isEmpty || shreddedData.isEmpty() // RDD.isEmpty called as last resort + val finalShreddedTypes = if (isEmpty) Nil else ShreddedType(Common.AtomicSchema, Format.TSV) :: shreddedTypes + + LoaderMessage.ShreddingComplete(outFolder, finalShreddedTypes, timestamps, shredder.compression, MessageProcessor) } } diff --git a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/spark/Serialization.scala b/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/spark/Serialization.scala index 1081697ff..716d36428 100644 --- a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/spark/Serialization.scala +++ b/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/spark/Serialization.scala @@ -3,10 +3,10 @@ package com.snowplowanalytics.snowplow.shredder.spark import java.util.UUID import java.time.Instant -import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingData} +import com.snowplowanalytics.iglu.core.{SelfDescribingData, SchemaKey} import com.snowplowanalytics.snowplow.analytics.scalasdk.Event -import com.snowplowanalytics.snowplow.shredder.transformation.{FinalRow, Hierarchy} +import com.snowplowanalytics.snowplow.shredder.transformation.{Hierarchy, Shredded} object Serialization { val classesToRegister: Array[Class[_]] = Array( @@ -16,8 +16,9 @@ object Serialization { classOf[SelfDescribingData[_]], classOf[Event], classOf[Hierarchy], - classOf[FinalRow], classOf[Instant], + classOf[Array[Shredded]], + classOf[Shredded.Tabular], classOf[UUID], Class.forName("com.snowplowanalytics.iglu.core.SchemaVer$Full"), Class.forName("io.circe.JsonObject$LinkedHashMapJsonObject"), diff --git a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/spark/Sink.scala b/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/spark/Sink.scala index 5172fbd27..83c9cc024 100644 --- a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/spark/Sink.scala +++ b/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/spark/Sink.scala @@ -1,38 +1,45 @@ +/* + * Copyright (c) 2012-2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and + * limitations there under. + */ package com.snowplowanalytics.snowplow.shredder.spark -import com.snowplowanalytics.snowplow.rdbloader.common.Config.Compression -import com.snowplowanalytics.snowplow.rdbloader.common.{Config, LoaderMessage} - import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SparkSession, Row, SaveMode, DataFrameWriter} import org.apache.spark.sql.types.{StructField, StructType, StringType} + +import com.snowplowanalytics.snowplow.rdbloader.common.Config.Compression +import com.snowplowanalytics.snowplow.rdbloader.common.{Config, LoaderMessage} + import com.snowplowanalytics.snowplow.shredder.transformation.Shredded object Sink { - def writeEvents(spark: SparkSession, compression: Compression, events: RDD[Row], outFolder: String): Unit = - spark.createDataFrame(events, StructType(StructField("_", StringType, true) :: Nil)) - .write - .withCompression(compression) - .mode(SaveMode.Overwrite) - .text(getAlteredEnrichedOutputPath(outFolder)) - def writeShredded(spark: SparkSession, compression: Compression, formats: Config.Formats, shreddedData: RDD[Shredded], outFolder: String): Unit = { + writeShredded(spark, compression, shreddedData.flatMap(_.tsv), outFolder) val canBeJson = formats.default == LoaderMessage.Format.JSON || formats.json.nonEmpty - val canBeTsv = formats.default == LoaderMessage.Format.TSV || formats.tsv.nonEmpty - if (canBeJson) writeShredded(spark, compression, shreddedData.flatMap(_.json), true, outFolder) - if (canBeTsv) writeShredded(spark, compression, shreddedData.flatMap(_.tabular), false, outFolder) + if (canBeJson) writeShredded(spark, compression, shreddedData.flatMap(_.json), outFolder) } - def writeShredded(spark: SparkSession, compression: Compression, data: RDD[(String, String, String, String, String)], json: Boolean, outFolder: String): Unit = { + def writeShredded(spark: SparkSession, compression: Compression, data: RDD[(String, String, String, Int, String)], outFolder: String): Unit = { import spark.implicits._ data - .toDF("vendor", "name", "format", "version", "data") + .toDF("vendor", "name", "format", "model", "data") .write .withCompression(compression) - .partitionBy("vendor", "name", "format", "version") + .partitionBy("vendor", "name", "format", "model") .mode(SaveMode.Append) - .text(getShreddedTypesOutputPath(outFolder, json)) + .text(outFolder) } def writeBad(spark: SparkSession, compression: Compression, shreddedBad: RDD[Row], outFolder: String): Unit = @@ -42,27 +49,6 @@ object Sink { .mode(SaveMode.Overwrite) .text(outFolder) - - /** - * The path at which to store the shredded types. - * @param outFolder shredded/good/run=xxx - * @param json pre-R31 output path - * @return The shredded types output path - */ - def getShreddedTypesOutputPath(outFolder: String, json: Boolean): String = { - val shreddedTypesSubdirectory = if (json) "shredded-types" else "shredded-tsv" - s"$outFolder${if (outFolder.endsWith("/")) "" else "/"}$shreddedTypesSubdirectory" - } - /** - * The path at which to store the altered enriched events. - * @param outFolder shredded/good/run=xxx - * @return The altered enriched event path - */ - def getAlteredEnrichedOutputPath(outFolder: String): String = { - val alteredEnrichedEventSubdirectory = "atomic-events" - s"$outFolder${if (outFolder.endsWith("/")) "" else "/"}$alteredEnrichedEventSubdirectory" - } - private implicit class DataframeOps[A](w: DataFrameWriter[A]) { def withCompression(compression: Compression): DataFrameWriter[A] = compression match { diff --git a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/transformation/EventUtils.scala b/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/transformation/EventUtils.scala index 1531da2be..5830efbab 100644 --- a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/transformation/EventUtils.scala +++ b/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/transformation/EventUtils.scala @@ -19,10 +19,10 @@ import java.time.Instant import java.time.format.DateTimeParseException import io.circe.Json - import cats.{Id, Monad} import cats.data.{EitherT, NonEmptyList} import cats.implicits._ + import cats.effect.Clock import com.snowplowanalytics.iglu.core._ @@ -35,21 +35,14 @@ import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._ import com.snowplowanalytics.snowplow.analytics.scalasdk.{ParsingError, Event} - import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor, Failure, Payload, FailureDetails} - -import com.snowplowanalytics.snowplow.rdbloader.common.catsClockIdInstance +import com.snowplowanalytics.snowplow.rdbloader.common.{catsClockIdInstance, Common} import com.snowplowanalytics.snowplow.rdbloader.common.Flattening.{NullCharacter, getOrdered} - import com.snowplowanalytics.snowplow.rdbloader.generated.ProjectMetadata - import com.snowplowanalytics.snowplow.shredder.spark.singleton object EventUtils { - val AtomicSchema: SchemaKey = - SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)) - val BadRowsProcessor: Processor = Processor(ProjectMetadata.name, ProjectMetadata.version) /** @@ -70,8 +63,8 @@ object EventUtils { singleton.IgluSingleton .get(iglu) .resolver - .lookupSchema(AtomicSchema) - .leftMap(error => s"RDB Shredder could not fetch ${AtomicSchema.toSchemaUri} schema at initialization. ${(error: ClientError).show}") + .lookupSchema(Common.AtomicSchema) + .leftMap(error => s"RDB Shredder could not fetch ${Common.AtomicSchema.toSchemaUri} schema at initialization. ${(error: ClientError).show}") .flatMap(json => Schema.parse(json).flatMap(_.properties).map(_.value).toRight("atomic schema does not conform expected format")) .map(_.flatMap { case (k, v) => getLength(v).map { l => (k, l)}}) .flatMap(lengths => if (lengths.isEmpty) "atomic schema properties is empty".asLeft else lengths.asRight) diff --git a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/transformation/FinalRow.scala b/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/transformation/FinalRow.scala deleted file mode 100644 index 954d1ab77..000000000 --- a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/transformation/FinalRow.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.shredder.transformation - -import cats.implicits._ - -import io.circe.Json - -import com.snowplowanalytics.iglu.core.SchemaKey - -import com.snowplowanalytics.snowplow.analytics.scalasdk.Event -import com.snowplowanalytics.snowplow.badrows.BadRow - -import com.snowplowanalytics.snowplow.shredder.spark.singleton - -import org.apache.spark.sql.Row - -/** Final stage of event. After this, it can be shredded into different folders */ -case class FinalRow(atomic: Row, shredded: List[Shredded]) - -object FinalRow { - - def shred(igluConfig: Json, isTabular: SchemaKey => Boolean, atomicLengths: Map[String, Int])(event: Event): Either[BadRow, FinalRow] = - Hierarchy.fromEvent(event).traverse { hierarchy => - val tabular = isTabular(hierarchy.entity.schema) - Shredded.fromHierarchy(tabular, singleton.IgluSingleton.get(igluConfig).resolver)(hierarchy).toValidatedNel - }.leftMap(EventUtils.shreddingBadRow(event)).toEither.map { shredded => - val row = Row(EventUtils.alterEnrichedEvent(event, atomicLengths)) - FinalRow(row, shredded) - } -} \ No newline at end of file diff --git a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/transformation/Shredded.scala b/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/transformation/Shredded.scala index a1a247125..878efeb18 100644 --- a/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/transformation/Shredded.scala +++ b/modules/shredder/src/main/scala/com/snowplowanalytics/snowplow/shredder/transformation/Shredded.scala @@ -16,33 +16,41 @@ package com.snowplowanalytics.snowplow.shredder.transformation import cats.Id import cats.data.EitherT +import cats.implicits._ -import com.snowplowanalytics.iglu.client.Resolver +import io.circe.{Json => JSON} + +import com.snowplowanalytics.iglu.core.SchemaKey -import com.snowplowanalytics.snowplow.badrows.FailureDetails +import com.snowplowanalytics.iglu.client.Resolver +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.badrows.{BadRow, FailureDetails} import com.snowplowanalytics.snowplow.rdbloader.common.catsClockIdInstance +import com.snowplowanalytics.snowplow.rdbloader.common.Common.AtomicSchema +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.Format +import com.snowplowanalytics.snowplow.shredder.spark.singleton /** ADT, representing possible forms of data in blob storage */ sealed trait Shredded { - def json: Option[(String, String, String, String, String)] = this match { - case Shredded.Json(vendor, name, format, version, data) => Some((vendor, name, format, version, data)) - case Shredded.Tabular(_, _, _, _, _) => None + def json: Option[(String, String, String, Int, String)] = this match { + case Shredded.Json(vendor, name, version, data) => Some((vendor, name, Format.JSON.path, version, data)) + case Shredded.Tabular(_, _, _, _) => None } - def tabular: Option[(String, String, String, String, String)] = this match { - case Shredded.Tabular(vendor, name, format, version, data) => Some((vendor, name, format, version, data)) - case Shredded.Json(_, _, _, _, _) => None + def tsv: Option[(String, String, String, Int, String)] = this match { + case Shredded.Tabular(vendor, name, version, data) => Some((vendor, name, Format.TSV.path, version, data)) + case Shredded.Json(_, _, _, _) => None } } object Shredded { /** Data will be represented as JSON, with RDB Loader loading it using JSON Paths. Legacy format */ - case class Json(vendor: String, name: String, format: String, version: String, data: String) extends Shredded + case class Json(vendor: String, name: String, model: Int, data: String) extends Shredded /** Data will be represented as TSV, with RDB Loader loading it directly */ - case class Tabular(vendor: String, name: String, format: String, version: String, data: String) extends Shredded + case class Tabular(vendor: String, name: String, model: Int, data: String) extends Shredded /** * Transform JSON `Hierarchy`, extracted from enriched into a `Shredded` entity, @@ -56,13 +64,32 @@ object Shredded { def fromHierarchy(tabular: Boolean, resolver: => Resolver[Id])(hierarchy: Hierarchy): Either[FailureDetails.LoaderIgluError, Shredded] = { val vendor = hierarchy.entity.schema.vendor val name = hierarchy.entity.schema.name - val format = hierarchy.entity.schema.format val result: EitherT[Id, FailureDetails.LoaderIgluError, Shredded] = if (tabular) EventUtils.flatten(resolver, hierarchy.entity).map { columns => val meta = EventUtils.buildMetadata(hierarchy.eventId, hierarchy.collectorTstamp, hierarchy.entity.schema) - Tabular(vendor, name, format, hierarchy.entity.schema.version.model.toString, (meta ++ columns).mkString("\t")) - } else EitherT.pure[Id, FailureDetails.LoaderIgluError](Json(vendor, name, format, hierarchy.entity.schema.version.asString, hierarchy.dumpJson)) + Tabular(vendor, name, hierarchy.entity.schema.version.model, (meta ++ columns).mkString("\t")) + } else EitherT.pure[Id, FailureDetails.LoaderIgluError](Json(vendor, name, hierarchy.entity.schema.version.model, hierarchy.dumpJson)) result.value } + + /** + * Parse snowplow enriched event into a list of shredded (either JSON or TSV, according to settings) entities + * TSV will be flattened according to their schema, JSONs will be attached as is + * + * @param igluConfig Iglu Resolver config + * @param isTabular predicate to decide whether output should be JSON or TSV + * @param atomicLengths a map to trim atomic event columns + * @param event enriched event + * @return either bad row (in case of failed flattening) or list of shredded entities inside original event + */ + def fromEvent(igluConfig: JSON, isTabular: SchemaKey => Boolean, atomicLengths: Map[String, Int])(event: Event): Either[BadRow, List[Shredded]] = + Hierarchy.fromEvent(event).traverse { hierarchy => + val tabular = isTabular(hierarchy.entity.schema) + fromHierarchy(tabular, singleton.IgluSingleton.get(igluConfig).resolver)(hierarchy).toValidatedNel + }.leftMap(EventUtils.shreddingBadRow(event)).toEither.map { shredded => + val data = EventUtils.alterEnrichedEvent(event, atomicLengths) + val atomic = Shredded.Tabular(AtomicSchema.vendor, AtomicSchema.name, AtomicSchema.version.model, data) + atomic :: shredded + } } \ No newline at end of file diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/ShredJobSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/ShredJobSpec.scala index ff75b409d..09c0cd8d9 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/ShredJobSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/ShredJobSpec.scala @@ -60,6 +60,8 @@ object ShredJobSpec { val Version = ProjectMetadata.version + val AtomicFolder = "vendor=com.snowplowanalytics.snowplow/name=atomic/format=tsv/model=1" + /** Case class representing the input lines written in a file. */ case class Lines(l: String*) { val lines = l.toList diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/InvalidEnrichedEventsSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/InvalidEnrichedEventsSpec.scala index fa9629c2f..a7c9dc7b0 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/InvalidEnrichedEventsSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/InvalidEnrichedEventsSpec.scala @@ -14,8 +14,6 @@ */ package com.snowplowanalytics.snowplow.shredder.bad -import java.io.File - import io.circe.literal._ import com.snowplowanalytics.snowplow.shredder.ShredJobSpec @@ -56,11 +54,8 @@ class InvalidEnrichedEventsSpec extends Specification with ShredJobSpec { jsons must beEqualTo(List(InvalidEnrichedEventsSpec.expected)) } - "not write any atomic-events" in { - new File(dirs.output, "atomic-events") must beEmptyDir - } "not write any jsons" in { - new File(dirs.output, "shredded-types") must beEmptyDir + dirs.output must beEmptyDir } } } diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/InvalidJsonsSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/InvalidJsonsSpec.scala index 8a8520c24..0fd660d8a 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/InvalidJsonsSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/InvalidJsonsSpec.scala @@ -14,8 +14,6 @@ */ package com.snowplowanalytics.snowplow.shredder.bad -import java.io.File - import io.circe.literal._ import org.specs2.mutable.Specification @@ -55,11 +53,8 @@ class InvalidJsonsSpec extends Specification with ShredJobSpec { jsons must beEqualTo(List(InvalidJsonsSpec.expected)) } - "not write any atomic-events" in { - new File(dirs.output, "atomic-events") must beEmptyDir - } "not write any jsons" in { - new File(dirs.output, "shredded-types") must beEmptyDir + dirs.output must beEmptyDir } } } diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/MissingJsonSchemaSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/MissingJsonSchemaSpec.scala index 8e033936d..9b75b4c69 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/MissingJsonSchemaSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/MissingJsonSchemaSpec.scala @@ -14,8 +14,6 @@ */ package com.snowplowanalytics.snowplow.shredder.bad -import java.io.File - import io.circe.literal._ import org.specs2.mutable.Specification @@ -241,11 +239,8 @@ class MissingJsonSchemaSpec extends Specification with ShredJobSpec { jsons.map(setTimestamp("2019-07-18T05:18:27.439Z")) must beEqualTo(List(MissingJsonSchemaSpec.expected)) } - "not write any atomic-events" in { - new File(dirs.output, "atomic-events") must beEmptyDir - } "not write any jsons" in { - new File(dirs.output, "shredded-types") must beEmptyDir + dirs.output must beEmptyDir } } } diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/NotEnrichedEventsSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/NotEnrichedEventsSpec.scala index 7856e29ef..2ca555d6d 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/NotEnrichedEventsSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/NotEnrichedEventsSpec.scala @@ -14,8 +14,6 @@ */ package com.snowplowanalytics.snowplow.shredder.bad -import java.io.File - import io.circe.literal._ import org.specs2.mutable.Specification @@ -48,11 +46,8 @@ class NotEnrichedEventsSpec extends Specification with ShredJobSpec { jsons must containTheSameElementsAs(NotEnrichedEventsSpec.expected) } - "not write any atomic-events" in { - new File(dirs.output, "atomic-events") must beEmptyDir - } "not write any jsons" in { - new File(dirs.output, "shredded-types") must beEmptyDir + dirs.output must beEmptyDir } } } diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/SchemaValidationFailedSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/SchemaValidationFailedSpec.scala index b4e71d1e8..fe7420c08 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/SchemaValidationFailedSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/bad/SchemaValidationFailedSpec.scala @@ -14,8 +14,6 @@ */ package com.snowplowanalytics.snowplow.shredder.bad -import java.io.File - import io.circe.literal._ import org.specs2.mutable.Specification @@ -53,11 +51,8 @@ class SchemaValidationFailedSpec extends Specification with ShredJobSpec { jsons must beEqualTo(List(SchemaValidationFailedSpec.expected)) } - "not write any atomic-events" in { - new File(dirs.output, "atomic-events") must beEmptyDir - } "not write any jsons" in { - new File(dirs.output, "shredded-types") must beEmptyDir + dirs.output must beEmptyDir } } } diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/CrossBatchDeduplicationSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/CrossBatchDeduplicationSpec.scala index 0125425f5..3428e1adc 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/CrossBatchDeduplicationSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/CrossBatchDeduplicationSpec.scala @@ -66,7 +66,7 @@ object CrossBatchDeduplicationSpec { ) object expected { - val additionalContextPath = "shredded-types/vendor=com.snowplowanalytics.snowplow/name=ua_parser_context/format=jsonschema/version=1-0-0" + val additionalContextPath = "vendor=com.snowplowanalytics.snowplow/name=ua_parser_context/format=json/model=1" val additionalContextContents1 = s""" |{ @@ -233,13 +233,13 @@ class CrossBatchDeduplicationSpec extends Specification with ShredJobSpec { val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] - "remove cross-batch duplicate and store left event in /atomic-events" in { - val Some((lines, f)) = readPartFile(dirs.output, "atomic-events") + "remove cross-batch duplicate and store left event in atomic events folder" in { + val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) expectedFiles += f lines.sorted mustEqual CrossBatchDeduplicationSpec.expected.events } "shred two unique events out of cross-batch and in-batch duplicates" in { - val Some((lines, f)) = readPartFile(dirs.output, "atomic-events") + val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) expectedFiles += f val eventIds = lines.map(_.split("\t").apply(6)) eventIds must containTheSameElementsAs( diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/DerivedContextsSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/DerivedContextsSpec.scala index da39a7380..1a106559a 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/DerivedContextsSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/DerivedContextsSpec.scala @@ -26,7 +26,7 @@ object DerivedContextsSpec { ) object expected { - val path = s"shredded-types/vendor=org.schema/name=WebPage/format=jsonschema/version=1-0-0" + val path = s"vendor=org.schema/name=WebPage/format=json/model=1" val contents = s"""|{ |"schema":{ @@ -64,12 +64,13 @@ class DerivedContextsSpec extends Specification with ShredJobSpec { runShredJob(DerivedContextsSpec.lines) val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] - "transform the enriched event and store it in /atomic-events" in { - val Some((lines, f)) = readPartFile(dirs.output, "atomic-events") + "transform the enriched event and store it in atomic events" in { + val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) expectedFiles += f lines mustEqual Seq(DerivedContextsSpec.expected.event) } "shred the website page_context into its appropriate path" in { + println(dirs.output.list().toList) val Some((lines, f)) = readPartFile(dirs.output, DerivedContextsSpec.expected.path) expectedFiles += f lines mustEqual Seq(DerivedContextsSpec.expected.contents) diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/EmptySchemaSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/EmptySchemaSpec.scala index 84d007d75..bf30cd8c7 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/EmptySchemaSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/EmptySchemaSpec.scala @@ -26,13 +26,13 @@ class EmptySchemaSpec extends Specification with ShredJobSpec { runShredJob(EmptySchemaSpec.lines, tsv = true) val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] - "transform the enriched event and store it in /atomic-events" in { - readPartFile(dirs.output, "atomic-events") match { + "transform the enriched event and store it in atomic folder" in { + readPartFile(dirs.output, ShredJobSpec.AtomicFolder) match { case Some((lines, f)) => expectedFiles += f lines mustEqual Seq(EmptySchemaSpec.expected.event) case None => - ko("No data in /atomic-events") + ko("No data in atomic folder") } } "shred the context without any data into TSV with only metadata" in { @@ -73,8 +73,8 @@ object EmptySchemaSpec { val contexBContents = "com.snowplowanalytics.iglu\tanything-b\tjsonschema\t1-0-0\t2b1b25a4-c0df-4859-8201-cf21492ad61b\t2014-05-29 18:16:35.000\tevents\t[\"events\",\"anything-b\"]\tevents" - val contextAPath = s"shredded-tsv/vendor=com.snowplowanalytics.iglu/name=anything-a/format=jsonschema/version=1" - val contextBPath = s"shredded-tsv/vendor=com.snowplowanalytics.iglu/name=anything-b/format=jsonschema/version=1" + val contextAPath = "vendor=com.snowplowanalytics.iglu/name=anything-a/format=tsv/model=1" + val contextBPath = "vendor=com.snowplowanalytics.iglu/name=anything-b/format=tsv/model=1" // Removed three JSON columns and added 7 columns at the end val event = """snowplowweb web 2014-06-01 14:04:11.639 2014-05-29 18:16:35.000 2014-05-29 18:04:11.639 page_view 2b1b25a4-c0df-4859-8201-cf21492ad61b 836413 clojure js-2.0.0-M2 clj-0.6.0-tom-0.0.4 hadoop-0.5.0-common-0.4.0 216.207.42.134 3499345421 3b1d1a375044eede 3 2bad2a4e-aae4-4bea-8acd-399e7fe0366a US CA South San Francisco 37.654694 -122.4077 http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/ Writing Hive UDFs - a tutorial http snowplowanalytics.com 80 /blog/2013/02/08/writing-hive-udfs-and-serdes/ Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14 Safari Safari Browser WEBKIT en-us 0 0 0 0 0 0 0 0 0 1 24 1440 1845 Mac OS Mac OS Apple Inc. America/Los_Angeles Computer 0 1440 900 UTF-8 1440 6015 """ diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/EventDeduplicationSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/EventDeduplicationSpec.scala index 3d0f809e0..8ee7e47ec 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/EventDeduplicationSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/EventDeduplicationSpec.scala @@ -43,7 +43,7 @@ object EventDeduplicationSpec { ) object expected { - val path = "shredded-types/vendor=com.snowplowanalytics.snowplow/name=duplicate/format=jsonschema/version=1-0-0" + val path = "vendor=com.snowplowanalytics.snowplow/name=duplicate/format=json/model=1" val contents = s"""|{ |"schema":{ @@ -64,7 +64,7 @@ object EventDeduplicationSpec { |} |}""".stripMargin.replaceAll("[\n\r]","") - val additionalContextPath = "shredded-types/vendor=com.snowplowanalytics.snowplow/name=ua_parser_context/format=jsonschema/version=1-0-0" + val additionalContextPath = "vendor=com.snowplowanalytics.snowplow/name=ua_parser_context/format=json/model=1" val additionalContextContents = s""" |{ @@ -133,14 +133,14 @@ class EventDeduplicationSpec extends Specification with ShredJobSpec { runShredJob(EventDeduplicationSpec.lines) val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] - "transform two enriched events and store them in /atomic-events" in { - val Some((lines, f)) = readPartFile(dirs.output, "atomic-events") + "transform two enriched events and store them in atomic events folder" in { + val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) expectedFiles += f val updatedLines = lines.map(EventDeduplicationSpec.eraseEventId) updatedLines.sorted mustEqual EventDeduplicationSpec.expected.events } "shred two enriched events with deduplicated event ids" in { - val Some((lines, f)) = readPartFile(dirs.output, "atomic-events") + val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) expectedFiles += f val eventIds = lines.map(_.split("\t").apply(6)) diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/ForwardCompatibleContextSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/ForwardCompatibleContextSpec.scala index 3954dea20..aceb44ded 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/ForwardCompatibleContextSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/ForwardCompatibleContextSpec.scala @@ -26,7 +26,7 @@ object ForwardCompatibleContextSpec { ) object expected { - val path = "shredded-types/vendor=org.schema/name=WebPage/format=jsonschema/version=1-0-0" + val path = "vendor=org.schema/name=WebPage/format=json/model=1" val contents = s"""|{ |"schema":{ @@ -64,8 +64,8 @@ class ForwardCompatibleContextSpec extends Specification with ShredJobSpec { runShredJob(ForwardCompatibleContextSpec.lines) val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] - "transform the enriched event and store it in /atomic-events" in { - val Some((lines, f)) = readPartFile(dirs.output, "atomic-events") + "transform the enriched event and store it in atomic events folder" in { + val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) expectedFiles += f lines mustEqual Seq(ForwardCompatibleContextSpec.expected.event) } diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/LinkClickEventSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/LinkClickEventSpec.scala index 2f3372c97..c55700d12 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/LinkClickEventSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/LinkClickEventSpec.scala @@ -26,7 +26,7 @@ object LinkClickEventSpec { object expected { - val path = "shredded-types/vendor=com.snowplowanalytics.snowplow/name=link_click/format=jsonschema/version=1-0-0" + val path = "vendor=com.snowplowanalytics.snowplow/name=link_click/format=json/model=1" val contents = s"""|{ |"schema":{ @@ -61,8 +61,8 @@ class LinkClickEventSpec extends Specification with ShredJobSpec { runShredJob(LinkClickEventSpec.lines) val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] - "transform the enriched event and store it in /atomic-events" in { - val Some((lines, f)) = readPartFile(dirs.output, "atomic-events") + "transform the enriched event and store it in atomic folder" in { + val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) expectedFiles += f lines mustEqual Seq(LinkClickEventSpec.expected.event) } diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/MultipleJsonsSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/MultipleJsonsSpec.scala index bd0d12ddd..a0f265b6e 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/MultipleJsonsSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/MultipleJsonsSpec.scala @@ -38,8 +38,8 @@ class MultipleJsonsSpec extends Specification with ShredJobSpec { runShredJob(MultipleJsonsSpec.lines) val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] - "transform the enriched event and store it in /atomic-events" in { - val Some((lines, f)) = readPartFile(dirs.output, "atomic-events") + "transform the enriched event and store it in atomic events folder" in { + val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) expectedFiles += f lines mustEqual Seq(MultipleJsonsSpec.expectedAtomicEvent) } diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/WebsitePageContextSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/WebsitePageContextSpec.scala index 57b8cc126..05036f3bb 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/WebsitePageContextSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/WebsitePageContextSpec.scala @@ -25,7 +25,7 @@ object WebsitePageContextSpec { ) object expected { - val path = "shredded-types/vendor=org.schema/name=WebPage/format=jsonschema/version=1-0-0" + val path = "vendor=org.schema/name=WebPage/format=json/model=1" val contents = s"""|{ |"schema":{ @@ -63,8 +63,8 @@ class WebsitePageContextSpec extends Specification with ShredJobSpec { runShredJob(WebsitePageContextSpec.lines) val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] - "transform the enriched event and store it in /atomic-events" in { - val Some((lines, f)) = readPartFile(dirs.output, "atomic-events") + "transform the enriched event and store it in atomic events folder" in { + val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) expectedFiles += f lines mustEqual Seq(WebsitePageContextSpec.expected.event) } diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/tabular/NewlineSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/tabular/NewlineSpec.scala index 09648605b..344942bc6 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/tabular/NewlineSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/tabular/NewlineSpec.scala @@ -29,8 +29,8 @@ class NewlineSpec extends Specification with ShredJobSpec { runShredJob(NewlineSpec.lines, tsv = true) val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] - "transform the enriched event and store it in /atomic-events" in { - val Some((lines, f)) = readPartFile(dirs.output, "atomic-events") + "transform the enriched event and store it in atomic events folder" in { + val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) expectedFiles += f lines mustEqual Seq(NewlineSpec.expected.event) } @@ -72,7 +72,7 @@ object NewlineSpec { ) object expected { - val contextPath = s"shredded-tsv/vendor=com.snowplowanalytics.snowplow/name=change_form/format=jsonschema/version=1" + val contextPath = s"vendor=com.snowplowanalytics.snowplow/name=change_form/format=tsv/model=1" val contextContents = "com.snowplowanalytics.snowplow\tchange_form\tjsonschema\t1-0-0\tdeadbeef-dead-beef-dead-0000beefdead\t1970-01-01 00:00:00.000\tevents\t[\"events\",\"change_form\"]\tevents\tb\ta\tTEXTAREA\t\\N\t\\N\tline 1 line2 column2" diff --git a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/tabular/TabularOutputSpec.scala b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/tabular/TabularOutputSpec.scala index 78c333264..7ac60f364 100644 --- a/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/tabular/TabularOutputSpec.scala +++ b/modules/shredder/src/test/scala/com/snowplowanalytics/snowplow/shredder/good/tabular/TabularOutputSpec.scala @@ -28,8 +28,8 @@ class TabularOutputSpec extends Specification with ShredJobSpec { runShredJob(TabularOutputSpec.lines, tsv = true) val expectedFiles = scala.collection.mutable.ArrayBuffer.empty[String] - "transform the enriched event and store it in /atomic-events" in { - val Some((lines, f)) = readPartFile(dirs.output, "atomic-events") + "transform the enriched event and store it in atomic events folder" in { + val Some((lines, f)) = readPartFile(dirs.output, AtomicFolder) expectedFiles += f lines mustEqual Seq(TabularOutputSpec.expected.event) } @@ -76,12 +76,12 @@ object TabularOutputSpec { ) object expected { - val contextPath = s"shredded-tsv/vendor=org.schema/name=WebPage/format=jsonschema/version=1" + val contextPath = s"vendor=org.schema/name=WebPage/format=tsv/model=1" val contextContents = "org.schema\tWebPage\tjsonschema\t1-0-0\t2b1b25a4-c0df-4859-8201-cf21492ad61b\t2014-05-29 18:16:35.000\tevents\t[\"events\",\"WebPage\"]\tevents\tJonathan Almeida\t[\"blog\",\"releases\"]\t\\N\t\\N\t2014-07-23T00:00:00Z\tblog\ten-US\t[\"snowplow\",\"analytics\",\"java\",\"jvm\",\"tracker\"]" - val eventPath = s"shredded-tsv/vendor=com.snowplowanalytics.snowplow/name=application_error/format=jsonschema/version=1" + val eventPath = s"vendor=com.snowplowanalytics.snowplow/name=application_error/format=tsv/model=1" val eventContents = "com.snowplowanalytics.snowplow\tapplication_error\tjsonschema\t1-0-2\t2b1b25a4-c0df-4859-8201-cf21492ad61b\t2014-05-29 18:16:35.000\tevents\t[\"events\",\"application_error\"]\tevents\tundefined is not a function\tJAVASCRIPT\tAbstractSingletonFactoryBean\t\\N\t1\t\\N\t\\N\t14\t\\N\t\\N\t\\N\tthis column should be last"