From 70d5aecf3f23a101d2b2376477894f51b4ae8726 Mon Sep 17 00:00:00 2001 From: Rostyslav Zatserkovnyi Date: Wed, 27 Feb 2019 16:20:42 +0200 Subject: [PATCH] Truncate string values exceeding column's limits (close #42) --- build.sbt | 3 +- .../Config.scala | 4 +- .../snowflake/core/ConfigSpec.scala | 20 +- project/BuildSettings.scala | 1 + project/Dependencies.scala | 2 + .../snowflake/transformer/Main.scala | 34 +- .../snowflake/transformer/Transformer.scala | 35 +- .../transformer/TransformerJob.scala | 11 +- .../transformer/TransformerSpec.scala | 507 ++++++++++++++++++ 9 files changed, 599 insertions(+), 18 deletions(-) create mode 100644 transformer/src/test/scala/com/snowplowanalytics/snowflake/transformer/TransformerSpec.scala diff --git a/build.sbt b/build.sbt index 6bb5d352..fe10b87f 100644 --- a/build.sbt +++ b/build.sbt @@ -45,7 +45,8 @@ lazy val transformer = project libraryDependencies ++= Seq( Dependencies.hadoop, Dependencies.spark, - Dependencies.sparkSql + Dependencies.sparkSql, + Dependencies.schemaDdl ) ++ commonDependencies ) .dependsOn(core) diff --git a/core/src/main/scala/com.snowplowanalytics.snowflake.core/Config.scala b/core/src/main/scala/com.snowplowanalytics.snowflake.core/Config.scala index edc35936..123b3133 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowflake.core/Config.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowflake.core/Config.scala @@ -76,7 +76,7 @@ object Config { case class CliLoaderConfiguration(command: Command, loaderConfig: Config, loaderVersion: String, skip: Set[SetupSteps], dryRun: Boolean) case class RawCliTransformer(loaderConfig: String, resolver: String, eventsManifestConfig: Option[String], inbatch: Boolean) - case class CliTransformerConfiguration(loaderConfig: Config, eventsManifestConfig: Option[DynamoDbConfig], inbatch: Boolean) + case class CliTransformerConfiguration(loaderConfig: Config, resolver: Resolver, eventsManifestConfig: Option[DynamoDbConfig], inbatch: Boolean) /** Available methods to authenticate Snowflake loading */ sealed trait AuthMethod @@ -170,7 +170,7 @@ object Config { .map { x => Some(x) } case None => Right(None) } - } yield CliTransformerConfiguration(config, eventsManifestConfig, rawConfig.inbatch) + } yield CliTransformerConfiguration(config, resolver, eventsManifestConfig, rawConfig.inbatch) } diff --git a/core/src/test/scala/com/snowplowanalytics/snowflake/core/ConfigSpec.scala b/core/src/test/scala/com/snowplowanalytics/snowflake/core/ConfigSpec.scala index 74889e6d..56e883db 100644 --- a/core/src/test/scala/com/snowplowanalytics/snowflake/core/ConfigSpec.scala +++ b/core/src/test/scala/com/snowplowanalytics/snowflake/core/ConfigSpec.scala @@ -13,9 +13,14 @@ package com.snowplowanalytics.snowflake.core import org.specs2.Specification + +import com.snowplowanalytics.iglu.client.Resolver +import com.snowplowanalytics.iglu.client.repositories.{HttpRepositoryRef, RepositoryRefConfig} + +import com.snowplowanalytics.snowplow.eventsmanifest.DynamoDbConfig + import com.snowplowanalytics.snowflake.core.Config.S3Folder.{coerce => s3} import com.snowplowanalytics.snowflake.core.Config.{CliLoaderConfiguration, CliTransformerConfiguration, SetupSteps} -import com.snowplowanalytics.snowplow.eventsmanifest.DynamoDbConfig class ConfigSpec extends Specification { def is = @@ -310,6 +315,19 @@ class ConfigSpec extends Specification { database = "test_db", maxError = None, jdbcHost = None), + Resolver( + cacheSize = 5, + repos = List( + HttpRepositoryRef( + config = RepositoryRefConfig( + name = "Iglu Central base64", + instancePriority = 0, + vendorPrefixes = List("com.snowplowanalytics") + ), + uri = "http://iglucentral.com", + apikey = None + )) + ), Some(DynamoDbConfig( name = "local", auth = Some(DynamoDbConfig.CredentialsAuth( diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 3ce3e87b..4f92a5ec 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -101,6 +101,7 @@ object BuildSettings { case x if x.endsWith(".html") => MergeStrategy.discard case x if x.endsWith("public-suffix-list.txt") => MergeStrategy.last case PathList("org", "apache", "spark", "unused", tail@_*) => MergeStrategy.first + case PathList("com", "github", "fge", tail@_*) => MergeStrategy.first case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b7eb44b2..cdb85754 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -28,6 +28,7 @@ object Dependencies { val enumeratum = "1.5.13" val igluClient = "0.5.0" val eventsManifest = "0.1.0" + val schemaDdl = "0.9.0" // Scala (test only) val specs2 = "2.3.13" val scalazSpecs2 = "0.2" @@ -52,6 +53,7 @@ object Dependencies { val enumeratum = "com.beachape" %% "enumeratum" % V.enumeratum val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % V.igluClient val eventsManifest = "com.snowplowanalytics" %% "snowplow-events-manifest" % V.eventsManifest + val schemaDdl = "com.snowplowanalytics" %% "schema-ddl" % V.schemaDdl // Scala (test only) val specs2 = "org.specs2" %% "specs2" % V.specs2 % "test" diff --git a/transformer/src/main/scala/com/snowplowanalytics/snowflake/transformer/Main.scala b/transformer/src/main/scala/com/snowplowanalytics/snowflake/transformer/Main.scala index 6c07c569..927eb858 100644 --- a/transformer/src/main/scala/com/snowplowanalytics/snowflake/transformer/Main.scala +++ b/transformer/src/main/scala/com/snowplowanalytics/snowflake/transformer/Main.scala @@ -12,15 +12,22 @@ */ package com.snowplowanalytics.snowflake.transformer -import com.snowplowanalytics.snowflake.core.{Config, ProcessManifest} +import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession +import scalaz.{Failure, Success} + +import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema +import com.snowplowanalytics.iglu.schemaddl.jsonschema.json4s.implicits.json4sToSchema + +import com.snowplowanalytics.snowflake.core.{Config, ProcessManifest} + object Main { def main(args: Array[String]): Unit = { Config.parseTransformerCli(args) match { - case Some(Right(Config.CliTransformerConfiguration(appConfig, eventsManifestConfig, inbatch))) => + case Some(Right(Config.CliTransformerConfiguration(appConfig, resolver, eventsManifestConfig, inbatch))) => // Always use EMR Role role for manifest-access val s3 = ProcessManifest.getS3(appConfig.awsRegion) @@ -37,10 +44,31 @@ object Main { // Get run folders that are not in RunManifest in any form val runFolders = manifest.getUnprocessed(appConfig.manifest, appConfig.input) + // Get Atomic schema from Iglu + val atomic = resolver.lookupSchema("iglu:com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0") match { + case Success(jsonSchema) => Schema.parse(fromJsonNode(jsonSchema)) match { + case Some(schema) => schema.properties.map { properties => properties.value.mapValues { property => + property.maxLength.map {_.value.intValue} + }} match { + case Some(schemaMap) => schemaMap + case None => + println("Could not convert atomic event schema to map") + sys.exit(1) + } + case None => + println("Atomic event schema was invalid") + sys.exit(1) + } + case Failure(error) => + println("Cannot get atomic event schema") + println(error) + sys.exit(1) + } + runFolders match { case Right(folders) => val configs = folders.map(TransformerJobConfig(appConfig.input, appConfig.stageUrl, _)) - TransformerJob.run(spark, manifest, appConfig.manifest, configs, eventsManifestConfig, inbatch) + TransformerJob.run(spark, manifest, appConfig.manifest, configs, eventsManifestConfig, inbatch, atomic) case Left(error) => println("Cannot get list of unprocessed folders") println(error) diff --git a/transformer/src/main/scala/com/snowplowanalytics/snowflake/transformer/Transformer.scala b/transformer/src/main/scala/com/snowplowanalytics/snowflake/transformer/Transformer.scala index 16ab837b..6b9e22d6 100644 --- a/transformer/src/main/scala/com/snowplowanalytics/snowflake/transformer/Transformer.scala +++ b/transformer/src/main/scala/com/snowplowanalytics/snowflake/transformer/Transformer.scala @@ -15,12 +15,18 @@ package com.snowplowanalytics.snowflake.transformer // java import java.util.UUID +// circe +import io.circe.Json +import io.circe.syntax._ + // cats import cats.data.Validated.{Invalid, Valid} +// schema-ddl +import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema + // scala-analytics-sdk -import com.snowplowanalytics.snowplow.analytics.scalasdk.Event -import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent +import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, SnowplowEvent} // events-manifest import com.snowplowanalytics.snowplow.eventsmanifest.EventsManifest @@ -38,7 +44,7 @@ object Transformer { * @param eventsManifestConfig events manifest config instance * @return pair of set with column names and JValue */ - def transform(event: Event, eventsManifestConfig: Option[EventsManifestConfig]): Option[(Set[String], String)] = { + def transform(event: Event, eventsManifestConfig: Option[EventsManifestConfig], atomic: Map[String, Option[Int]]): Option[(Set[String], String)] = { val shredTypes = event.inventory.map(item => SnowplowEvent.transformSchema(item.shredProperty, item.schemaKey)) val eventId = event.event_id.toString val eventFingerprint = event.event_fingerprint.getOrElse(UUID.randomUUID().toString) @@ -48,9 +54,9 @@ object Transformer { EventsManifestSingleton.get(eventsManifestConfig) match { case Some(manifest) => if (manifest.put(eventId, eventFingerprint, etlTstamp)) { - Some((shredTypes, event.toJson(true).noSpaces)) + Some((shredTypes, truncateFields(event.toJson(true), atomic).noSpaces)) } else None - case None => Some((shredTypes, event.toJson(true).noSpaces)) + case None => Some((shredTypes, truncateFields(event.toJson(true), atomic).noSpaces)) } } @@ -63,8 +69,23 @@ object Transformer { def jsonify(line: String): Event = { Event.parse(line) match { case Valid(event) => event - case Invalid(e) => - throw new RuntimeException(e.toList.mkString("\n")) + case Invalid(e) => throw new RuntimeException(e.toList.mkString("\n")) } } + + /** + * Truncate a Snowplow event's fields based on atomic schema + */ + def truncateFields(eventJson: Json, atomic: Map[String, Option[Int]]): Json = { + Json.fromFields(eventJson.asObject.getOrElse(throw new RuntimeException(s"Event JSON is not an object? $eventJson")).toList.map { + case (key, value) if value.isString => + atomic.get(key) match { + case Some(Some(length)) => (key, value.asString.map { s => + (if (s.length > length) s.take(length) else s).asJson + }.getOrElse(value)) + case _ => (key, value) + } + case other => other + }) + } } diff --git a/transformer/src/main/scala/com/snowplowanalytics/snowflake/transformer/TransformerJob.scala b/transformer/src/main/scala/com/snowplowanalytics/snowflake/transformer/TransformerJob.scala index 372a25eb..76e1e254 100644 --- a/transformer/src/main/scala/com/snowplowanalytics/snowflake/transformer/TransformerJob.scala +++ b/transformer/src/main/scala/com/snowplowanalytics/snowflake/transformer/TransformerJob.scala @@ -14,17 +14,19 @@ package com.snowplowanalytics.snowflake.transformer import org.apache.spark.sql.{SaveMode, SparkSession} +import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema + import com.snowplowanalytics.snowflake.core.ProcessManifest import com.snowplowanalytics.snowplow.eventsmanifest.EventsManifest.EventsManifestConfig object TransformerJob { /** Process all directories, saving state into DynamoDB */ - def run(spark: SparkSession, manifest: ProcessManifest, tableName: String, jobConfigs: List[TransformerJobConfig], eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean): Unit = { + def run(spark: SparkSession, manifest: ProcessManifest, tableName: String, jobConfigs: List[TransformerJobConfig], eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean, atomic: Map[String, Option[Int]]): Unit = { jobConfigs.foreach { jobConfig => println(s"Snowflake Transformer: processing ${jobConfig.runId}. ${System.currentTimeMillis()}") manifest.add(tableName, jobConfig.runId) - val shredTypes = process(spark, jobConfig, eventsManifestConfig, inbatch) + val shredTypes = process(spark, jobConfig, eventsManifestConfig, inbatch, atomic) manifest.markProcessed(tableName, jobConfig.runId, shredTypes, jobConfig.output) println(s"Snowflake Transformer: processed ${jobConfig.runId}. ${System.currentTimeMillis()}") } @@ -38,9 +40,10 @@ object TransformerJob { * @param jobConfig configuration with paths * @param eventsManifestConfig events manifest config instance * @param inbatch whether inbatch deduplication should be used + * @param atomicSchema map of field names to maximum lengths * @return list of discovered shredded types */ - def process(spark: SparkSession, jobConfig: TransformerJobConfig, eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean) = { + def process(spark: SparkSession, jobConfig: TransformerJobConfig, eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean, atomic: Map[String, Option[Int]]) = { import spark.implicits._ val sc = spark.sparkContext @@ -56,7 +59,7 @@ object TransformerJob { .flatMap { case (_, vs) => vs.take(1) } } else events val snowflake = dedupedEvents.flatMap { e => - Transformer.transform(e, eventsManifestConfig) match { + Transformer.transform(e, eventsManifestConfig, atomic) match { case Some((keys, transformed)) => keysAggregator.add(keys) Some(transformed) diff --git a/transformer/src/test/scala/com/snowplowanalytics/snowflake/transformer/TransformerSpec.scala b/transformer/src/test/scala/com/snowplowanalytics/snowflake/transformer/TransformerSpec.scala new file mode 100644 index 00000000..aa62bd55 --- /dev/null +++ b/transformer/src/test/scala/com/snowplowanalytics/snowflake/transformer/TransformerSpec.scala @@ -0,0 +1,507 @@ +/* + * Copyright (c) 2017 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowflake.transformer + +// java +import java.time.Instant +import java.util.UUID + +// json4s +import org.json4s.jackson.JsonMethods.fromJsonNode + +// scalaz +import scalaz.{Failure, Success} + +// circe +import io.circe.{Json, JsonObject} +import io.circe.syntax._ +import io.circe.parser._ + +// specs2 +import org.specs2.Specification + +// scala-analytics-sdk +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.{Contexts, UnstructEvent} + +// iglu +import com.snowplowanalytics.iglu.client.Resolver +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema +import com.snowplowanalytics.iglu.schemaddl.jsonschema.json4s.implicits.json4sToSchema + +// This library +import com.snowplowanalytics.snowflake.core.Config + +class TransformerSpec extends Specification { + + def is = + s2""" + Correctly truncate event fields $e1 + Correctly transform event to shredded key/JSON pair $e2 + """ + + val resolverBase64 = "eyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5pZ2x1L3Jlc29sdmVyLWNvbmZpZy9qc29uc2NoZW1hLzEtMC0xIiwiZGF0YSI6eyJjYWNoZVNpemUiOjUsInJlcG9zaXRvcmllcyI6W3sibmFtZSI6IklnbHUgQ2VudHJhbCBiYXNlNjQiLCJwcmlvcml0eSI6MCwidmVuZG9yUHJlZml4ZXMiOlsiY29tLnNub3dwbG93YW5hbHl0aWNzIl0sImNvbm5lY3Rpb24iOnsiaHR0cCI6eyJ1cmkiOiJodHRwOi8vaWdsdWNlbnRyYWwuY29tIn19fV19fQ==" + + val resolver = Config.parseJsonFile(resolverBase64, true) match { + case Right(r) => Resolver.parse(r) match { + case Success(pr) => pr + case Failure(e) => throw new RuntimeException(e.toString) + } + case Left(e) => throw new RuntimeException(e) + } + + val atomicSchema = resolver.lookupSchema("iglu:com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0") match { + case Success(jsonSchema) => Schema.parse(fromJsonNode(jsonSchema)) match { + case Some(schema) => schema + case None => throw new RuntimeException("Atomic schema was invalid") + } + case Failure(e) => throw new RuntimeException(e.toString) + } + + val atomic = atomicSchema.properties.map { properties => properties.value.mapValues { property => + property.maxLength.map {_.value.intValue} + }}.getOrElse(throw new RuntimeException("Could not convert schema to map")) + + val event = Event( + app_id = Some("angry-birds"), + platform = Some("web"), + etl_tstamp = Some(Instant.parse("2017-01-26T00:01:25.292Z")), + collector_tstamp = Instant.parse("2013-11-26T00:02:05Z"), + dvce_created_tstamp = Some(Instant.parse("2013-11-26T00:03:57.885Z")), + event = Some("page_view"), + event_id = UUID.fromString("c6ef3124-b53a-4b13-a233-0088f79dcbcb"), + txn_id = Some(41828), + name_tracker = Some("cloudfront-1"), + v_tracker = Some("js-2.1.0"), + v_collector = "clj-tomcat-0.1.0", + v_etl = "serde-0.5.2", + user_id = Some("jon.doe@email.com"), + user_ipaddress = Some("92.231.54.234"), + user_fingerprint = Some("2161814971"), + domain_userid = Some("bc2e92ec6c204a14"), + domain_sessionidx = Some(3), + network_userid = Some("ecdff4d0-9175-40ac-a8bb-325c49733607"), + geo_country = Some("US"), + geo_region = Some("TX"), + geo_city = Some("New York"), + geo_zipcode = Some("94109"), + geo_latitude = Some(37.443604), + geo_longitude = Some(-122.4124), + geo_region_name = Some("Florida"), + ip_isp = Some("FDN Communications"), + ip_organization = Some("Bouygues Telecom"), + ip_domain = Some("nuvox.net"), + ip_netspeed = Some("Cable/DSL"), + page_url = Some("http://www.snowplowanalytics.com"), + page_title = Some("On Analytics"), + page_referrer = None, + page_urlscheme = Some("http"), + page_urlhost = Some("www.snowplowanalytics.com"), + page_urlport = Some(80), + page_urlpath = Some("/product/index.html"), + page_urlquery = Some("id=GTM-DLRG"), + page_urlfragment = Some("4-conclusion"), + refr_urlscheme = None, + refr_urlhost = None, + refr_urlport = None, + refr_urlpath = None, + refr_urlquery = None, + refr_urlfragment = None, + refr_medium = None, + refr_source = None, + refr_term = None, + mkt_medium = None, + mkt_source = None, + mkt_term = None, + mkt_content = None, + mkt_campaign = None, + contexts = Contexts( + List( + SelfDescribingData( + SchemaKey( + "org.schema", + "WebPage", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ), + JsonObject( + ("genre", "blog".asJson), + ("inLanguage", "en-US".asJson), + ("datePublished", "2014-11-06T00:00:00Z".asJson), + ("author", "Fred Blundun".asJson), + ("breadcrumb", List("blog", "releases").asJson), + ("keywords", List("snowplow", "javascript", "tracker", "event").asJson) + ).asJson + ), + SelfDescribingData( + SchemaKey( + "org.w3", + "PerformanceTiming", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ), + JsonObject( + ("navigationStart", 1415358089861L.asJson), + ("unloadEventStart", 1415358090270L.asJson), + ("unloadEventEnd", 1415358090287L.asJson), + ("redirectStart", 0.asJson), + ("redirectEnd", 0.asJson), + ("fetchStart", 1415358089870L.asJson), + ("domainLookupStart", 1415358090102L.asJson), + ("domainLookupEnd", 1415358090102L.asJson), + ("connectStart", 1415358090103L.asJson), + ("connectEnd", 1415358090183L.asJson), + ("requestStart", 1415358090183L.asJson), + ("responseStart", 1415358090265L.asJson), + ("responseEnd", 1415358090265L.asJson), + ("domLoading", 1415358090270L.asJson), + ("domInteractive", 1415358090886L.asJson), + ("domContentLoadedEventStart", 1415358090968L.asJson), + ("domContentLoadedEventEnd", 1415358091309L.asJson), + ("domComplete", 0.asJson), + ("loadEventStart", 0.asJson), + ("loadEventEnd", 0.asJson) + ).asJson + ) + ) + ), + se_category = None, + se_action = None, + se_label = None, + se_property = None, + se_value = None, + unstruct_event = UnstructEvent( + Some( + SelfDescribingData( + SchemaKey( + "com.snowplowanalytics.snowplow", + "link_click", + "jsonschema", + SchemaVer.Full(1, 0, 1) + ), + JsonObject( + ("targetUrl", "http://www.example.com".asJson), + ("elementClasses", List("foreground").asJson), + ("elementId", "exampleLink".asJson) + ).asJson + ) + ) + ), + tr_orderid = None, + tr_affiliation = None, + tr_total = None, + tr_tax = None, + tr_shipping = None, + tr_city = None, + tr_state = None, + tr_country = None, + ti_orderid = None, + ti_sku = None, + ti_name = None, + ti_category = None, + ti_price = None, + ti_quantity = None, + pp_xoffset_min = None, + pp_xoffset_max = None, + pp_yoffset_min = None, + pp_yoffset_max = None, + useragent = None, + br_name = None, + br_family = None, + br_version = None, + br_type = None, + br_renderengine = None, + br_lang = None, + br_features_pdf = Some(true), + br_features_flash = Some(false), + br_features_java = None, + br_features_director = None, + br_features_quicktime = None, + br_features_realplayer = None, + br_features_windowsmedia = None, + br_features_gears = None, + br_features_silverlight = None, + br_cookies = None, + br_colordepth = None, + br_viewwidth = None, + br_viewheight = None, + os_name = None, + os_family = None, + os_manufacturer = None, + os_timezone = None, + dvce_type = None, + dvce_ismobile = None, + dvce_screenwidth = None, + dvce_screenheight = None, + doc_charset = None, + doc_width = None, + doc_height = None, + tr_currency = None, + tr_total_base = None, + tr_tax_base = None, + tr_shipping_base = None, + ti_currency = None, + ti_price_base = None, + base_currency = None, + geo_timezone = None, + mkt_clickid = None, + mkt_network = None, + etl_tags = None, + dvce_sent_tstamp = None, + refr_domain_userid = None, + refr_device_tstamp = None, + derived_contexts = Contexts( + List( + SelfDescribingData( + SchemaKey( + "com.snowplowanalytics.snowplow", + "ua_parser_context", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ), + JsonObject( + ("useragentFamily", "IE".asJson), + ("useragentMajor", "7".asJson), + ("useragentMinor", "0".asJson), + ("useragentPatch", Json.Null), + ("useragentVersion", "IE 7.0".asJson), + ("osFamily", "Windows XP".asJson), + ("osMajor", Json.Null), + ("osMinor", Json.Null), + ("osPatch", Json.Null), + ("osPatchMinor", Json.Null), + ("osVersion", "Windows XP".asJson), + ("deviceFamily", "Other".asJson) + ).asJson + ) + ) + ), + domain_sessionid = Some("2b15e5c8-d3b1-11e4-b9d6-1681e6b88ec1"), + derived_tstamp = Some(Instant.parse("2013-11-26T00:03:57.886Z")), + event_vendor = Some("com.snowplowanalytics.snowplow"), + event_name = Some("link_click"), + event_format = Some("jsonschema"), + event_version = Some("1-0-0"), + event_fingerprint = Some("e3dbfa9cca0412c3d4052863cefb547f"), + true_tstamp = Some(Instant.parse("2013-11-26T00:03:57.886Z")) + ) + + def e1 = { + // truncateFields must not break event structure if all fields are in order + Transformer.truncateFields(event.toJson(true), atomic) mustEqual event.toJson(true) + + // truncateFields must truncate specific long fields to length from atomic schema + Transformer.truncateFields(event.copy(v_collector = "a" * 200).toJson(true), atomic) mustEqual event.copy(v_collector = "a" * 100).toJson(true) + Transformer.truncateFields(event.copy(geo_region = Some("12345")).toJson(true), atomic) mustEqual event.copy(geo_region = Some("123")).toJson(true) + Transformer.truncateFields(event.toJson(true), atomic + ("app_id" -> Some(5))) mustEqual event.copy(app_id = Some("angry")).toJson(true) + } + + + def e2 = { + val transformed = Transformer.transform(event, None, atomic).getOrElse(throw new RuntimeException("Transformed event must be Some")) + + // transform must have valid shredded types + transformed._1 mustEqual Set( + "contexts_org_schema_web_page_1", + "contexts_org_w3_performance_timing_1", + "contexts_com_snowplowanalytics_snowplow_ua_parser_context_1", + "unstruct_event_com_snowplowanalytics_snowplow_link_click_1" + ) + + val expectedJson = parse("""{ + "geo_location" : "37.443604,-122.4124", + "app_id" : "angry-birds", + "platform" : "web", + "etl_tstamp" : "2017-01-26T00:01:25.292Z", + "collector_tstamp" : "2013-11-26T00:02:05Z", + "dvce_created_tstamp" : "2013-11-26T00:03:57.885Z", + "event" : "page_view", + "event_id" : "c6ef3124-b53a-4b13-a233-0088f79dcbcb", + "txn_id" : 41828, + "name_tracker" : "cloudfront-1", + "v_tracker" : "js-2.1.0", + "v_collector" : "clj-tomcat-0.1.0", + "v_etl" : "serde-0.5.2", + "user_id" : "jon.doe@email.com", + "user_ipaddress" : "92.231.54.234", + "user_fingerprint" : "2161814971", + "domain_userid" : "bc2e92ec6c204a14", + "domain_sessionidx" : 3, + "network_userid" : "ecdff4d0-9175-40ac-a8bb-325c49733607", + "geo_country" : "US", + "geo_region" : "TX", + "geo_city" : "New York", + "geo_zipcode" : "94109", + "geo_latitude" : 37.443604, + "geo_longitude" : -122.4124, + "geo_region_name" : "Florida", + "ip_isp" : "FDN Communications", + "ip_organization" : "Bouygues Telecom", + "ip_domain" : "nuvox.net", + "ip_netspeed" : "Cable/DSL", + "page_url" : "http://www.snowplowanalytics.com", + "page_title" : "On Analytics", + "page_referrer" : null, + "page_urlscheme" : "http", + "page_urlhost" : "www.snowplowanalytics.com", + "page_urlport" : 80, + "page_urlpath" : "/product/index.html", + "page_urlquery" : "id=GTM-DLRG", + "page_urlfragment" : "4-conclusion", + "refr_urlscheme" : null, + "refr_urlhost" : null, + "refr_urlport" : null, + "refr_urlpath" : null, + "refr_urlquery" : null, + "refr_urlfragment" : null, + "refr_medium" : null, + "refr_source" : null, + "refr_term" : null, + "mkt_medium" : null, + "mkt_source" : null, + "mkt_term" : null, + "mkt_content" : null, + "mkt_campaign" : null, + "contexts_org_schema_web_page_1" : [ { + "genre" : "blog", + "inLanguage" : "en-US", + "datePublished" : "2014-11-06T00:00:00Z", + "author" : "Fred Blundun", + "breadcrumb" : [ "blog", "releases" ], + "keywords" : [ "snowplow", "javascript", "tracker", "event" ] + } ], + "contexts_org_w3_performance_timing_1" : [ { + "navigationStart" : 1415358089861, + "unloadEventStart" : 1415358090270, + "unloadEventEnd" : 1415358090287, + "redirectStart" : 0, + "redirectEnd" : 0, + "fetchStart" : 1415358089870, + "domainLookupStart" : 1415358090102, + "domainLookupEnd" : 1415358090102, + "connectStart" : 1415358090103, + "connectEnd" : 1415358090183, + "requestStart" : 1415358090183, + "responseStart" : 1415358090265, + "responseEnd" : 1415358090265, + "domLoading" : 1415358090270, + "domInteractive" : 1415358090886, + "domContentLoadedEventStart" : 1415358090968, + "domContentLoadedEventEnd" : 1415358091309, + "domComplete" : 0, + "loadEventStart" : 0, + "loadEventEnd" : 0 + } ], + "se_category" : null, + "se_action" : null, + "se_label" : null, + "se_property" : null, + "se_value" : null, + "unstruct_event_com_snowplowanalytics_snowplow_link_click_1" : { + "targetUrl" : "http://www.example.com", + "elementClasses" : [ "foreground" ], + "elementId" : "exampleLink" + }, + "tr_orderid" : null, + "tr_affiliation" : null, + "tr_total" : null, + "tr_tax" : null, + "tr_shipping" : null, + "tr_city" : null, + "tr_state" : null, + "tr_country" : null, + "ti_orderid" : null, + "ti_sku" : null, + "ti_name" : null, + "ti_category" : null, + "ti_price" : null, + "ti_quantity" : null, + "pp_xoffset_min" : null, + "pp_xoffset_max" : null, + "pp_yoffset_min" : null, + "pp_yoffset_max" : null, + "useragent" : null, + "br_name" : null, + "br_family" : null, + "br_version" : null, + "br_type" : null, + "br_renderengine" : null, + "br_lang" : null, + "br_features_pdf" : true, + "br_features_flash" : false, + "br_features_java" : null, + "br_features_director" : null, + "br_features_quicktime" : null, + "br_features_realplayer" : null, + "br_features_windowsmedia" : null, + "br_features_gears" : null, + "br_features_silverlight" : null, + "br_cookies" : null, + "br_colordepth" : null, + "br_viewwidth" : null, + "br_viewheight" : null, + "os_name" : null, + "os_family" : null, + "os_manufacturer" : null, + "os_timezone" : null, + "dvce_type" : null, + "dvce_ismobile" : null, + "dvce_screenwidth" : null, + "dvce_screenheight" : null, + "doc_charset" : null, + "doc_width" : null, + "doc_height" : null, + "tr_currency" : null, + "tr_total_base" : null, + "tr_tax_base" : null, + "tr_shipping_base" : null, + "ti_currency" : null, + "ti_price_base" : null, + "base_currency" : null, + "geo_timezone" : null, + "mkt_clickid" : null, + "mkt_network" : null, + "etl_tags" : null, + "dvce_sent_tstamp" : null, + "refr_domain_userid" : null, + "refr_device_tstamp" : null, + "contexts_com_snowplowanalytics_snowplow_ua_parser_context_1": [{ + "useragentFamily": "IE", + "useragentMajor": "7", + "useragentMinor": "0", + "useragentPatch": null, + "useragentVersion": "IE 7.0", + "osFamily": "Windows XP", + "osMajor": null, + "osMinor": null, + "osPatch": null, + "osPatchMinor": null, + "osVersion": "Windows XP", + "deviceFamily": "Other" + }], + "domain_sessionid": "2b15e5c8-d3b1-11e4-b9d6-1681e6b88ec1", + "derived_tstamp": "2013-11-26T00:03:57.886Z", + "event_vendor": "com.snowplowanalytics.snowplow", + "event_name": "link_click", + "event_format": "jsonschema", + "event_version": "1-0-0", + "event_fingerprint": "e3dbfa9cca0412c3d4052863cefb547f", + "true_tstamp": "2013-11-26T00:03:57.886Z" + }""") + + // transform must have atomic event with fields identical to expected + // (but not necessarily the same field ordering) + parse(transformed._2) mustEqual expectedJson + } +}