Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
Truncate string values exceeding column's limits (close #42)
Browse files Browse the repository at this point in the history
  • Loading branch information
rzats committed Feb 27, 2019
1 parent f7b1db3 commit 0ac0a76
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 13 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ lazy val commonDependencies = Seq(
Dependencies.enumeratum,
Dependencies.igluClient,
Dependencies.eventsManifest,
Dependencies.schemaDdl,
// Scala (test-only)
Dependencies.specs2,
Dependencies.scalazSpecs2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object Config {
case class CliLoaderConfiguration(command: Command, loaderConfig: Config, loaderVersion: String, skip: Set[SetupSteps], dryRun: Boolean)

case class RawCliTransformer(loaderConfig: String, resolver: String, eventsManifestConfig: Option[String], inbatch: Boolean)
case class CliTransformerConfiguration(loaderConfig: Config, eventsManifestConfig: Option[DynamoDbConfig], inbatch: Boolean)
case class CliTransformerConfiguration(loaderConfig: Config, resolver: Resolver, eventsManifestConfig: Option[DynamoDbConfig], inbatch: Boolean)

/** Available methods to authenticate Snowflake loading */
sealed trait AuthMethod
Expand Down Expand Up @@ -170,7 +170,7 @@ object Config {
.map { x => Some(x) }
case None => Right(None)
}
} yield CliTransformerConfiguration(config, eventsManifestConfig, rawConfig.inbatch)
} yield CliTransformerConfiguration(config, resolver, eventsManifestConfig, rawConfig.inbatch)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@
package com.snowplowanalytics.snowflake.core

import org.specs2.Specification

import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.repositories.{HttpRepositoryRef, RepositoryRefConfig}

import com.snowplowanalytics.snowplow.eventsmanifest.DynamoDbConfig

import com.snowplowanalytics.snowflake.core.Config.S3Folder.{coerce => s3}
import com.snowplowanalytics.snowflake.core.Config.{CliLoaderConfiguration, CliTransformerConfiguration, SetupSteps}
import com.snowplowanalytics.snowplow.eventsmanifest.DynamoDbConfig

class ConfigSpec extends Specification {
def is =
Expand Down Expand Up @@ -310,6 +315,19 @@ class ConfigSpec extends Specification {
database = "test_db",
maxError = None,
jdbcHost = None),
Resolver(
cacheSize = 5,
repos = List(
HttpRepositoryRef(
config = RepositoryRefConfig(
name = "Iglu Central base64",
instancePriority = 0,
vendorPrefixes = List("com.snowplowanalytics")
),
uri = "http://iglucentral.com",
apikey = None
))
),
Some(DynamoDbConfig(
name = "local",
auth = Some(DynamoDbConfig.CredentialsAuth(
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ object Dependencies {
val enumeratum = "1.5.13"
val igluClient = "0.5.0"
val eventsManifest = "0.1.0"
val schemaDdl = "0.9.0"
// Scala (test only)
val specs2 = "2.3.13"
val scalazSpecs2 = "0.2"
Expand All @@ -52,6 +53,7 @@ object Dependencies {
val enumeratum = "com.beachape" %% "enumeratum" % V.enumeratum
val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % V.igluClient
val eventsManifest = "com.snowplowanalytics" %% "snowplow-events-manifest" % V.eventsManifest
val schemaDdl = "com.snowplowanalytics" %% "schema-ddl" % V.schemaDdl

// Scala (test only)
val specs2 = "org.specs2" %% "specs2" % V.specs2 % "test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,22 @@
*/
package com.snowplowanalytics.snowflake.transformer

import com.snowplowanalytics.snowflake.core.{Config, ProcessManifest}
import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import scalaz.{Failure, Success}

import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.json4s.implicits.json4sToSchema

import com.snowplowanalytics.snowflake.core.{Config, ProcessManifest}

object Main {
def main(args: Array[String]): Unit = {
Config.parseTransformerCli(args) match {
case Some(Right(Config.CliTransformerConfiguration(appConfig, eventsManifestConfig, inbatch))) =>
case Some(Right(Config.CliTransformerConfiguration(appConfig, resolver, eventsManifestConfig, inbatch))) =>

// Always use EMR Role role for manifest-access
val s3 = ProcessManifest.getS3(appConfig.awsRegion)
Expand All @@ -37,10 +44,24 @@ object Main {
// Get run folders that are not in RunManifest in any form
val runFolders = manifest.getUnprocessed(appConfig.manifest, appConfig.input)

// Get Atomic schema from Iglu
val atomicSchema = resolver.lookupSchema("iglu:com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0") match {
case Success(jsonSchema) => Schema.parse(fromJsonNode(jsonSchema)) match {
case Some(schema) => schema
case None =>
println("Atomic event schema was invalid")
sys.exit(1)
}
case Failure(error) =>
println("Cannot get atomic event schema")
println(error)
sys.exit(1)
}

runFolders match {
case Right(folders) =>
val configs = folders.map(TransformerJobConfig(appConfig.input, appConfig.stageUrl, _))
TransformerJob.run(spark, manifest, appConfig.manifest, configs, eventsManifestConfig, inbatch)
TransformerJob.run(spark, manifest, appConfig.manifest, configs, eventsManifestConfig, inbatch, atomicSchema)
case Left(error) =>
println("Cannot get list of unprocessed folders")
println(error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import java.util.UUID
// cats
import cats.data.Validated.{Invalid, Valid}

// schema-ddl
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema

// scala-analytics-sdk
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent
Expand Down Expand Up @@ -57,12 +60,33 @@ object Transformer {
/**
* Transform TSV to pair of inventory items and JSON object
*
* @param line enriched event TSV
* @param line enriched event TSV
* @param atomicSchema instance of Schema DDL Schema class for atomic event schema
* @return Event case class instance
*/
def jsonify(line: String): Event = {
def jsonify(line: String, atomicSchema: Schema): Event = {
Event.parse(line) match {
case Valid(event) => event
case Valid(event) =>
val fields = Event.getClass.getDeclaredFields.map(_.getName).zip(event.productIterator.to) map {
// For string fields, try to find a property with the same name in the schema, then
// truncate the string to its maxLength
case (key, Some(value: String)) =>
atomicSchema.properties match { // atomic.properties
case Some(properties) =>
properties.value.get(key) match { // atomic.properties.$key
case Some(property) =>
property.maxLength match { // atomic.properties.$key.maxLength
case Some(maxLength) => Some(key, value.take(maxLength.value.intValue))
case None => Some(key, value)
}
case None => Some(key, value)
}
case None => Some(key, value)
}
// All other fields should be returned as-is
case _ => _
}
Event.getClass.getMethods.find(x => x.getName == "apply" && x.isBridge).get.invoke(Event, fields map (_.asInstanceOf[AnyRef]): _*).asInstanceOf[Event]
case Invalid(e) =>
throw new RuntimeException(e.toList.mkString("\n"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@ package com.snowplowanalytics.snowflake.transformer

import org.apache.spark.sql.{SaveMode, SparkSession}

import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema

import com.snowplowanalytics.snowflake.core.ProcessManifest
import com.snowplowanalytics.snowplow.eventsmanifest.EventsManifest.EventsManifestConfig

object TransformerJob {

/** Process all directories, saving state into DynamoDB */
def run(spark: SparkSession, manifest: ProcessManifest, tableName: String, jobConfigs: List[TransformerJobConfig], eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean): Unit = {
def run(spark: SparkSession, manifest: ProcessManifest, tableName: String, jobConfigs: List[TransformerJobConfig], eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean, atomicSchema: Schema): Unit = {
jobConfigs.foreach { jobConfig =>
println(s"Snowflake Transformer: processing ${jobConfig.runId}. ${System.currentTimeMillis()}")
manifest.add(tableName, jobConfig.runId)
val shredTypes = process(spark, jobConfig, eventsManifestConfig, inbatch)
val shredTypes = process(spark, jobConfig, eventsManifestConfig, inbatch, atomicSchema)
manifest.markProcessed(tableName, jobConfig.runId, shredTypes, jobConfig.output)
println(s"Snowflake Transformer: processed ${jobConfig.runId}. ${System.currentTimeMillis()}")
}
Expand All @@ -38,9 +40,10 @@ object TransformerJob {
* @param jobConfig configuration with paths
* @param eventsManifestConfig events manifest config instance
* @param inbatch whether inbatch deduplication should be used
* @param atomicSchema instance of Schema DDL Schema class for atomic event schema
* @return list of discovered shredded types
*/
def process(spark: SparkSession, jobConfig: TransformerJobConfig, eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean) = {
def process(spark: SparkSession, jobConfig: TransformerJobConfig, eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean, atomicSchema: Schema) = {
import spark.implicits._

val sc = spark.sparkContext
Expand All @@ -49,7 +52,7 @@ object TransformerJob {

val events = sc
.textFile(jobConfig.input)
.map { e => Transformer.jsonify(e) }
.map { e => Transformer.jsonify(e, atomicSchema) }
val dedupedEvents = if (inbatch) {
events
.groupBy { e => (e.event_id, e.event_fingerprint) }
Expand Down

0 comments on commit 0ac0a76

Please sign in to comment.