Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
Truncate string values exceeding column's limits (close #42)
Browse files Browse the repository at this point in the history
  • Loading branch information
rzats committed Mar 1, 2019
1 parent f7b1db3 commit cd8097c
Show file tree
Hide file tree
Showing 9 changed files with 595 additions and 18 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ lazy val transformer = project
libraryDependencies ++= Seq(
Dependencies.hadoop,
Dependencies.spark,
Dependencies.sparkSql
Dependencies.sparkSql,
Dependencies.schemaDdl
) ++ commonDependencies
)
.dependsOn(core)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object Config {
case class CliLoaderConfiguration(command: Command, loaderConfig: Config, loaderVersion: String, skip: Set[SetupSteps], dryRun: Boolean)

case class RawCliTransformer(loaderConfig: String, resolver: String, eventsManifestConfig: Option[String], inbatch: Boolean)
case class CliTransformerConfiguration(loaderConfig: Config, eventsManifestConfig: Option[DynamoDbConfig], inbatch: Boolean)
case class CliTransformerConfiguration(loaderConfig: Config, resolver: Resolver, eventsManifestConfig: Option[DynamoDbConfig], inbatch: Boolean)

/** Available methods to authenticate Snowflake loading */
sealed trait AuthMethod
Expand Down Expand Up @@ -170,7 +170,7 @@ object Config {
.map { x => Some(x) }
case None => Right(None)
}
} yield CliTransformerConfiguration(config, eventsManifestConfig, rawConfig.inbatch)
} yield CliTransformerConfiguration(config, resolver, eventsManifestConfig, rawConfig.inbatch)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@
package com.snowplowanalytics.snowflake.core

import org.specs2.Specification

import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.repositories.{HttpRepositoryRef, RepositoryRefConfig}

import com.snowplowanalytics.snowplow.eventsmanifest.DynamoDbConfig

import com.snowplowanalytics.snowflake.core.Config.S3Folder.{coerce => s3}
import com.snowplowanalytics.snowflake.core.Config.{CliLoaderConfiguration, CliTransformerConfiguration, SetupSteps}
import com.snowplowanalytics.snowplow.eventsmanifest.DynamoDbConfig

class ConfigSpec extends Specification {
def is =
Expand Down Expand Up @@ -310,6 +315,19 @@ class ConfigSpec extends Specification {
database = "test_db",
maxError = None,
jdbcHost = None),
Resolver(
cacheSize = 5,
repos = List(
HttpRepositoryRef(
config = RepositoryRefConfig(
name = "Iglu Central base64",
instancePriority = 0,
vendorPrefixes = List("com.snowplowanalytics")
),
uri = "http://iglucentral.com",
apikey = None
))
),
Some(DynamoDbConfig(
name = "local",
auth = Some(DynamoDbConfig.CredentialsAuth(
Expand Down
1 change: 1 addition & 0 deletions project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ object BuildSettings {
case x if x.endsWith(".html") => MergeStrategy.discard
case x if x.endsWith("public-suffix-list.txt") => MergeStrategy.last
case PathList("org", "apache", "spark", "unused", tail@_*) => MergeStrategy.first
case PathList("com", "github", "fge", tail@_*) => MergeStrategy.first
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ object Dependencies {
val enumeratum = "1.5.13"
val igluClient = "0.5.0"
val eventsManifest = "0.1.0"
val schemaDdl = "0.9.0"
// Scala (test only)
val specs2 = "2.3.13"
val scalazSpecs2 = "0.2"
Expand All @@ -52,6 +53,7 @@ object Dependencies {
val enumeratum = "com.beachape" %% "enumeratum" % V.enumeratum
val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % V.igluClient
val eventsManifest = "com.snowplowanalytics" %% "snowplow-events-manifest" % V.eventsManifest
val schemaDdl = "com.snowplowanalytics" %% "schema-ddl" % V.schemaDdl

// Scala (test only)
val specs2 = "org.specs2" %% "specs2" % V.specs2 % "test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,22 @@
*/
package com.snowplowanalytics.snowflake.transformer

import com.snowplowanalytics.snowflake.core.{Config, ProcessManifest}
import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import scalaz.{Failure, Success}

import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.json4s.implicits.json4sToSchema

import com.snowplowanalytics.snowflake.core.{Config, ProcessManifest}

object Main {
def main(args: Array[String]): Unit = {
Config.parseTransformerCli(args) match {
case Some(Right(Config.CliTransformerConfiguration(appConfig, eventsManifestConfig, inbatch))) =>
case Some(Right(Config.CliTransformerConfiguration(appConfig, resolver, eventsManifestConfig, inbatch))) =>

// Always use EMR Role role for manifest-access
val s3 = ProcessManifest.getS3(appConfig.awsRegion)
Expand All @@ -37,10 +44,24 @@ object Main {
// Get run folders that are not in RunManifest in any form
val runFolders = manifest.getUnprocessed(appConfig.manifest, appConfig.input)

// Get Atomic schema from Iglu
val atomic = resolver.lookupSchema("iglu:com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0") match {
case Success(jsonSchema) => Schema.parse(fromJsonNode(jsonSchema)) match {
case Some(schema) => schema
case None =>
println("Atomic event schema was invalid")
sys.exit(1)
}
case Failure(error) =>
println("Cannot get atomic event schema")
println(error)
sys.exit(1)
}

runFolders match {
case Right(folders) =>
val configs = folders.map(TransformerJobConfig(appConfig.input, appConfig.stageUrl, _))
TransformerJob.run(spark, manifest, appConfig.manifest, configs, eventsManifestConfig, inbatch)
TransformerJob.run(spark, manifest, appConfig.manifest, configs, eventsManifestConfig, inbatch, atomic)
case Left(error) =>
println("Cannot get list of unprocessed folders")
println(error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ package com.snowplowanalytics.snowflake.transformer
// java
import java.util.UUID

// circe
import io.circe.Json
import io.circe.syntax._

// cats
import cats.data.Validated.{Invalid, Valid}

// schema-ddl
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema

// scala-analytics-sdk
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent
import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, SnowplowEvent}

// events-manifest
import com.snowplowanalytics.snowplow.eventsmanifest.EventsManifest
Expand All @@ -38,19 +44,22 @@ object Transformer {
* @param eventsManifestConfig events manifest config instance
* @return pair of set with column names and JValue
*/
def transform(event: Event, eventsManifestConfig: Option[EventsManifestConfig]): Option[(Set[String], String)] = {
def transform(event: Event, eventsManifestConfig: Option[EventsManifestConfig], atomicSchema: Schema): Option[(Set[String], String)] = {
val shredTypes = event.inventory.map(item => SnowplowEvent.transformSchema(item.shredProperty, item.schemaKey))
val eventId = event.event_id.toString
val eventFingerprint = event.event_fingerprint.getOrElse(UUID.randomUUID().toString)
val etlTstamp = event.etl_tstamp.map(i => EventsManifest.RedshiftTstampFormatter.format(i))
.getOrElse(throw new RuntimeException(s"etl_tstamp in event $eventId is empty or missing"))
val atomic = atomicSchema.properties.map { properties => properties.value.mapValues { property =>
property.maxLength.map {_.value.intValue}
}}.getOrElse(throw new RuntimeException(s"Could not convert atomic schema to property map"))

EventsManifestSingleton.get(eventsManifestConfig) match {
case Some(manifest) =>
if (manifest.put(eventId, eventFingerprint, etlTstamp)) {
Some((shredTypes, event.toJson(true).noSpaces))
Some((shredTypes, truncateFields(event.toJson(true), atomic).noSpaces))
} else None
case None => Some((shredTypes, event.toJson(true).noSpaces))
case None => Some((shredTypes, truncateFields(event.toJson(true), atomic).noSpaces))
}
}

Expand All @@ -63,8 +72,23 @@ object Transformer {
def jsonify(line: String): Event = {
Event.parse(line) match {
case Valid(event) => event
case Invalid(e) =>
throw new RuntimeException(e.toList.mkString("\n"))
case Invalid(e) => throw new RuntimeException(e.toList.mkString("\n"))
}
}

/**
* Truncate a Snowplow event's fields based on atomic schema
*/
def truncateFields(eventJson: Json, atomic: Map[String, Option[Int]]): Json = {
Json.fromFields(eventJson.asObject.getOrElse(throw new RuntimeException(s"Event JSON is not an object? $eventJson")).toList.map {
case (key, value) if value.isString =>
atomic.get(key) match {
case Some(Some(length)) => (key, value.asString.map { s =>
(if (s.length > length) s.take(length) else s).asJson
}.getOrElse(value))
case _ => (key, value)
}
case other => other
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@ package com.snowplowanalytics.snowflake.transformer

import org.apache.spark.sql.{SaveMode, SparkSession}

import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema

import com.snowplowanalytics.snowflake.core.ProcessManifest
import com.snowplowanalytics.snowplow.eventsmanifest.EventsManifest.EventsManifestConfig

object TransformerJob {

/** Process all directories, saving state into DynamoDB */
def run(spark: SparkSession, manifest: ProcessManifest, tableName: String, jobConfigs: List[TransformerJobConfig], eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean): Unit = {
def run(spark: SparkSession, manifest: ProcessManifest, tableName: String, jobConfigs: List[TransformerJobConfig], eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean, atomicSchema: Schema): Unit = {
jobConfigs.foreach { jobConfig =>
println(s"Snowflake Transformer: processing ${jobConfig.runId}. ${System.currentTimeMillis()}")
manifest.add(tableName, jobConfig.runId)
val shredTypes = process(spark, jobConfig, eventsManifestConfig, inbatch)
val shredTypes = process(spark, jobConfig, eventsManifestConfig, inbatch, atomicSchema)
manifest.markProcessed(tableName, jobConfig.runId, shredTypes, jobConfig.output)
println(s"Snowflake Transformer: processed ${jobConfig.runId}. ${System.currentTimeMillis()}")
}
Expand All @@ -38,9 +40,10 @@ object TransformerJob {
* @param jobConfig configuration with paths
* @param eventsManifestConfig events manifest config instance
* @param inbatch whether inbatch deduplication should be used
* @param atomicSchema map of field names to maximum lengths
* @return list of discovered shredded types
*/
def process(spark: SparkSession, jobConfig: TransformerJobConfig, eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean) = {
def process(spark: SparkSession, jobConfig: TransformerJobConfig, eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean, atomicSchema: Schema) = {
import spark.implicits._

val sc = spark.sparkContext
Expand All @@ -56,7 +59,7 @@ object TransformerJob {
.flatMap { case (_, vs) => vs.take(1) }
} else events
val snowflake = dedupedEvents.flatMap { e =>
Transformer.transform(e, eventsManifestConfig) match {
Transformer.transform(e, eventsManifestConfig, atomicSchema) match {
case Some((keys, transformed)) =>
keysAggregator.add(keys)
Some(transformed)
Expand Down
Loading

0 comments on commit cd8097c

Please sign in to comment.