Skip to content

Commit

Permalink
RDB Shredder: add tabular data output (close #151)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Aug 14, 2019
1 parent 95def81 commit 55178f4
Show file tree
Hide file tree
Showing 8 changed files with 353 additions and 31 deletions.
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object Dependencies {
val analyticsSdk = "0.4.2"
val spark = "2.3.2"
val eventsManifest = "0.2.0"
val schemaDdl = "0.9.0"
val schemaDdl = "0.10.0"

// Java (Loader)
val postgres = "42.0.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,31 @@
*/
package com.snowplowanalytics.snowplow.storage.spark

import java.util.UUID
import java.time.Instant
import java.time.format.DateTimeParseException

import io.circe.Json

import cats.Monad
import cats.data.EitherT
import cats.syntax.either._
import cats.syntax.show._
import cats.effect.Clock

import com.snowplowanalytics.iglu.core._
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.ClientError.ResolutionError

import com.snowplowanalytics.iglu.schemaddl.IgluSchema
import com.snowplowanalytics.iglu.schemaddl.migrations.FlatData
import com.snowplowanalytics.iglu.schemaddl.migrations.Migration.OrderedSchemas
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event

object EventUtils {
Expand Down Expand Up @@ -56,13 +71,66 @@ object EventUtils {
tabular.mkString("\t")
}

/** Build a map of columnName -> maxLength, according to `schema`. Non-string values are not present in the map */
def getAtomicLengths(schema: Json): Either[String, Map[String, Int]] =
for {
schema <- Schema.parse(schema).flatMap(_.properties).map(_.value).toRight("atomic schema does not conform expected format")
lengths = schema.flatMap { case (k, v) => getLength(v).map { l => (k, l)} }
_ <- if (lengths.isEmpty) "atomic schema properties is empty".asLeft else ().asRight
} yield lengths

def getLength(schema: Schema): Option[Int] =
/** Get auxiliary hierarchy/schema columns in TSV format */
def buildMetadata(rootId: UUID, rootTstamp: Instant, schema: SchemaKey): List[String] =
List(schema.vendor, schema.name, schema.format, schema.version.asString,
rootId.toString, rootTstamp.formatted, "events", s"""["events","${schema.name}"]""", "events")

/**
* Error specific to shredding JSON instance into tabular format
* `SchemaList` is unavailable (in case no Iglu Server hosts this schemas)
* Particular schema could not be fetched, thus whole flattening algorithm cannot be built
*/
sealed trait FlatteningError
object FlatteningError {
case class SchemaListResolution(error: ResolutionError) extends FlatteningError
case class SchemaResolution(error: ResolutionError) extends FlatteningError
case class Parsing(error: String) extends FlatteningError
}

/**
* Transform a self-desribing entity into tabular format, using its known schemas to get a correct order of columns
* @param resolver Iglu resolver to get list of known schemas
* @param instance self-describing JSON that needs to be transformed
* @return list of columns or flattening error
*/
def flatten[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], instance: SelfDescribingData[Json]): EitherT[F, FlatteningError, List[String]] =
getOrdered(resolver, instance.schema).map { ordered => FlatData.flatten(instance.data, ordered, Some(escape)) }

/** Prevents data with newlines and tabs from breaking the loading process */
private def escape(s: String): String =
s.replace('\n', ' ').replace('\t', ' ')

// Cache = Map[SchemaKey, OrderedSchemas]

def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], key: SchemaKey) =
for {
schemaList <- EitherT[F, ResolutionError, SchemaList](resolver.listSchemas(key.vendor, key.name, Some(key.version.model))).leftMap(FlatteningError.SchemaListResolution)
ordered <- OrderedSchemas.fromSchemaList(schemaList, fetch(resolver))
} yield ordered

def fetch[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F])(key: SchemaKey): EitherT[F, FlatteningError, IgluSchema] =
for {
json <- EitherT(resolver.lookupSchema(key, 2)).leftMap(FlatteningError.SchemaResolution)
schema <- EitherT.fromEither(parseSchema(json))
} yield schema

/** Parse JSON into self-describing schema, or return `FlatteningError` */
private def parseSchema(json: Json): Either[FlatteningError, IgluSchema] =
for {
selfDescribing <- SelfDescribingSchema.parse(json).leftMap(code => FlatteningError.Parsing(s"Cannot parse ${json.noSpaces} payload as self-describing schema, ${code.code}"))
parsed <- Schema.parse(selfDescribing.schema).toRight(FlatteningError.Parsing(s"Cannot parse ${selfDescribing.self.schemaKey.toSchemaUri} payload as JSON Schema"))
} yield SelfDescribingSchema(selfDescribing.self, parsed)

/** Get maximum length for a string value */
private def getLength(schema: Schema): Option[Int] =
schema.maxLength.map(_.value.toInt)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import scala.util.control.NonFatal

// Spark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
Expand Down Expand Up @@ -69,7 +70,7 @@ object ShredJob extends SparkJob {
val AtomicSchema = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0))

case class Hierarchy(eventId: UUID, collectorTstamp: Instant, entity: SelfDescribingData[Json]) {
def dump: String = json"""
def dumpJson: String = json"""
{
"schema": {
"vendor": ${entity.schema.vendor},
Expand Down Expand Up @@ -103,11 +104,20 @@ object ShredJob extends SparkJob {
classOf[Event],
classOf[Instant],
classOf[com.snowplowanalytics.iglu.core.SchemaVer$Full],
classOf[io.circe.Json$JObject],
classOf[io.circe.JsonObject$LinkedHashMapJsonObject],
classOf[io.circe.Json$JObject],
classOf[io.circe.Json$JString],
classOf[io.circe.Json$JArray],
classOf[io.circe.Json$JNull$],
classOf[io.circe.Json$JNumber],
classOf[io.circe.Json$JBoolean],
classOf[io.circe.Json],
Class.forName("io.circe.JsonLong"),
Class.forName("io.circe.JsonDecimal"),
Class.forName("io.circe.JsonBigDecimal"),
Class.forName("io.circe.JsonBiggerDecimal"),
Class.forName("io.circe.JsonDouble"),
Class.forName("io.circe.JsonFloat"),
classOf[java.util.LinkedHashMap[_, _]],
classOf[java.util.ArrayList[_]],
classOf[scala.collection.immutable.Map$EmptyMap$],
Expand Down Expand Up @@ -135,7 +145,7 @@ object ShredJob extends SparkJob {
// Processing manifest, existing only on a driver. Iglu Resolver without cache
val manifest = shredConfig.getManifestData.map {
case (m, i) =>
val resolver = singleton.ResolverSingleton.get(shredConfig.igluConfig)
val resolver = singleton.IgluSingleton.get(shredConfig.igluConfig)
ShredderManifest(DynamodbManifest.initialize(m, resolver.cacheless), i)
}

Expand All @@ -148,24 +158,28 @@ object ShredJob extends SparkJob {

val eventsManifest: Option[EventsManifestConfig] = shredConfig.duplicateStorageConfig.map { json =>
val config = EventsManifestConfig
.parseJson[Id](singleton.ResolverSingleton.get(shredConfig.igluConfig), json)
.parseJson[Id](singleton.IgluSingleton.get(shredConfig.igluConfig), json)
.valueOr(err => throw FatalEtlError(err))
val _ = singleton.DuplicateStorageSingleton.get(Some(config)) // Just to check it can be initialized
config
}

runJob(manifest, eventsManifest, atomicLengths, job).get
runJob(manifest, eventsManifest, atomicLengths, job, true).get
}

/** Start a job, if necessary recording process to manifest */
def runJob(manifest: Option[ShredderManifest], eventsManifest: Option[EventsManifestConfig], lengths: Map[String, Int], job: ShredJob): Try[Unit] = {
def runJob(manifest: Option[ShredderManifest],
eventsManifest: Option[EventsManifestConfig],
lengths: Map[String, Int],
job: ShredJob,
jsonOnly: Boolean): Try[Unit] = {
manifest match {
case None => // Manifest is not enabled, simply run a job
Try(job.run(lengths, eventsManifest)).map(_ => None)
Try(job.run(lengths, eventsManifest, jsonOnly)).map(_ => None)
case Some(ShredderManifest(manifest, itemId)) => // Manifest is enabled.
// Envelope job into function to pass to `Manifest.processItem` later
val process: ProcessNew = () => Try {
job.run(lengths, eventsManifest)
job.run(lengths, eventsManifest, jsonOnly)
val shreddedTypes = job.shreddedTypes.value.toSet
DynamodbManifest.processedPayload(shreddedTypes)
}
Expand Down Expand Up @@ -211,10 +225,11 @@ object ShredJob extends SparkJob {
/**
* The path at which to store the shredded types.
* @param outFolder shredded/good/run=xxx
* @param json pre-R31 output path
* @return The shredded types output path
*/
def getShreddedTypesOutputPath(outFolder: String): String = {
val shreddedTypesSubdirectory = "shredded-types"
def getShreddedTypesOutputPath(outFolder: String, json: Boolean): String = {
val shreddedTypesSubdirectory = if (json) "shredded-types" else "shredded-tsv" // TODO: change to shredded-json
s"$outFolder${if (outFolder.endsWith("/")) "" else "/"}$shreddedTypesSubdirectory"
}

Expand Down Expand Up @@ -274,14 +289,16 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig)
* - finding synthetic duplicates and adding them back with new ids
* - writing out JSON contexts as well as properly-formed and malformed events
*/
def run(atomicLengths: Map[String, Int], eventsManifest: Option[EventsManifestConfig]): Unit = {
def run(atomicLengths: Map[String, Int],
eventsManifest: Option[EventsManifestConfig],
jsonOnly: Boolean): Unit = {
import ShredJob._

val input = sc.textFile(shredConfig.inFolder)

// Enriched TSV lines along with their shredded components
val common = input
.map(line => loadAndShred(ResolverSingleton.get(shredConfig.igluConfig), line))
.map(line => loadAndShred(IgluSingleton.get(shredConfig.igluConfig), line))
.setName("common")
.cache()

Expand All @@ -290,7 +307,7 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig)
.flatMap { shredded => shredded.swap.toOption }
.map { badRow => Row(badRow.toCompactJson) }

// Handling of properly-formed rows; drop bad, turn proper events to `Shredded`
// Handling of properly-formed rows; drop bad, turn proper events to `Event`
// Pefrorm in-batch and cross-batch natural deduplications and writes found types to accumulator
// only one event from an event id and event fingerprint combination is kept
val good = common
Expand Down Expand Up @@ -358,16 +375,25 @@ class ShredJob(@transient val spark: SparkSession, shredConfig: ShredJobConfig)
.text(getAlteredEnrichedOutputPath(shredConfig.outFolder))

// Update the shredded JSONs with the new deduplicated event IDs and stringify
val jsons = goodWithSyntheticDupes
val shredded = goodWithSyntheticDupes
.flatMap(getShreddedEntities)
.map { h => (h.entity.schema.vendor, h.entity.schema.name, h.entity.schema.format, h.entity.schema.version.asString, h.dump) }
.map(Shredded.fromHierarchy(jsonOnly, singleton.IgluSingleton.get(shredConfig.igluConfig).resolver))

if (jsonOnly) {
val jsons = shredded.map(s => s.json.getOrElse(throw FatalEtlError(s"Inconsistent configuration. Can be shredded only into JSON. Got $s")))
writeShredded(jsons, true)
} else { // Partition TSV and JSON
writeShredded(shredded.flatMap(_.json), true)
writeShredded(shredded.flatMap(_.tabular), false)
}

jsons
.toDF("vendor", "name", "format", "version", "json")
.write
.partitionBy("vendor", "name", "format", "version")
.mode(SaveMode.Append)
.text(getShreddedTypesOutputPath(shredConfig.outFolder))
def writeShredded(data: RDD[(String, String, String, String, String)], json: Boolean): Unit =
data
.toDF("vendor", "name", "format", "version", "data")
.write
.partitionBy("vendor", "name", "format", "version")
.mode(SaveMode.Append)
.text(getShreddedTypesOutputPath(shredConfig.outFolder, json))

// Deduplication operation failed due to DynamoDB
val dupeFailed = good.flatMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ import com.snowplowanalytics.snowplow.rdbloader.generated.ProjectMetadata
* @param outFolder Output folder where the shredded events will be stored
* @param badFolder Output folder where the malformed events will be stored
* @param igluConfig JSON representing the Iglu configuration
* @param jsonOnly don't try to produce TSV output
*/
case class ShredJobConfig(inFolder: String,
outFolder: String,
badFolder: String,
igluConfig: Json,
duplicateStorageConfig: Option[Json],
dynamodbManifestTable: Option[String],
itemId: Option[String]) {
itemId: Option[String],
jsonOnly: Boolean) {

/** Get both manifest table and item id to process */
def getManifestData: Option[(String, String)] =
Expand Down Expand Up @@ -92,11 +94,12 @@ object ShredJobConfig {
val itemId = Opts.option[String]("item-id",
"Unique folder identificator for processing manifest (e.g. S3 URL)",
metavar = "<id>").orNone
val jsonOnly = Opts.flag("json-only", "Do not produce tabular output").orFalse

val shredJobConfig = (inputFolder, outputFolder, badFolder, igluConfig, duplicateStorageConfig, processingManifestTable, itemId).mapN {
(input, output, bad, iglu, dupeStorage, manifest, itemId) => ShredJobConfig(input, output, bad, iglu, dupeStorage, manifest, itemId)
val shredJobConfig = (inputFolder, outputFolder, badFolder, igluConfig, duplicateStorageConfig, processingManifestTable, itemId, jsonOnly).mapN {
(input, output, bad, iglu, dupeStorage, manifest, itemId, jsonOnly) => ShredJobConfig(input, output, bad, iglu, dupeStorage, manifest, itemId, jsonOnly)
}.validate("--item-id and --processing-manifest-table must be either both provided or both absent") {
case ShredJobConfig(_, _, _, _, _, manifest, i) => (manifest.isDefined && i.isDefined) || (manifest.isEmpty && i.isEmpty)
case ShredJobConfig(_, _, _, _, _, manifest, i, _) => (manifest.isDefined && i.isDefined) || (manifest.isEmpty && i.isEmpty)
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2012-2019 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/
package com.snowplowanalytics.snowplow.storage.spark

import cats.Id

import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.snowplow.storage.spark.ShredJob.Hierarchy

/** ADT, representing possible forms of data in blob storage */
sealed trait Shredded {
def json: Option[(String, String, String, String, String)] = this match {
case Shredded.Json(vendor, name, format, version, data) => Some((vendor, name, format, version, data))
case Shredded.Tabular(_, _, _, _, _) => None
}

def tabular: Option[(String, String, String, String, String)] = this match {
case Shredded.Tabular(vendor, name, format, version, data) => Some((vendor, name, format, version, data))
case Shredded.Json(_, _, _, _, _) => None
}
}

object Shredded {

/** Data will be present as JSON, with RDB Loader loading it using JSON Paths. Legacy format */
case class Json(vendor: String, name: String, format: String, version: String, data: String) extends Shredded

/** Data will be present as TSV, with RDB Loader loading it directly */
case class Tabular(vendor: String, name: String, format: String, version: String, data: String) extends Shredded

/**
* Transform JSON `Hierarchy`, extrancted from enriched into a `Shredded` entity,
* specifying how it should look like in destination: JSON or TSV
* If flattening algorithm failed at any point - it will fallback to the JSON format
*
* @param jsonOnly output can only be JSON. All downstream components should agree on that
* @param resolver Iglu resolver to request all necessary entities
* @param hierarchy actual JSON hierarchy from an enriched event
*/
def fromHierarchy(jsonOnly: Boolean, resolver: => Resolver[Id])(hierarchy: Hierarchy): Shredded = {
val vendor = hierarchy.entity.schema.vendor
val name = hierarchy.entity.schema.name
val format = hierarchy.entity.schema.format
if (jsonOnly)
Json(vendor, name, format, hierarchy.entity.schema.version.asString, hierarchy.dumpJson)
else
EventUtils.flatten(resolver, hierarchy.entity).value match {
case Right(columns) =>
val meta = EventUtils.buildMetadata(hierarchy.eventId, hierarchy.collectorTstamp, hierarchy.entity.schema)
Tabular(vendor, name, format, hierarchy.entity.schema.version.model.toString, (meta ++ columns).mkString("\t"))
case Left(_) =>
Json(vendor, name, format, hierarchy.entity.schema.version.asString, hierarchy.dumpJson)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.snowplowanalytics.snowplow.eventsmanifest.{DynamoDbManifest, EventsMa
object singleton {

/** Singleton for Iglu's Resolver to maintain one Resolver per node. */
object ResolverSingleton {
object IgluSingleton {

@volatile private var instance: Client[Id, Json] = _

Expand All @@ -44,7 +44,7 @@ object singleton {
if (instance == null) {
synchronized {
if (instance == null) {
instance = getIgluResolver(igluConfig)
instance = getIgluClient(igluConfig)
.valueOr(e => throw FatalEtlError(e.toString))
}
}
Expand All @@ -58,7 +58,7 @@ object singleton {
* @param igluConfig JSON representing the Iglu resolver
* @return A Resolver or one or more error messages boxed in a Scalaz ValidationNel
*/
private[spark] def getIgluResolver(igluConfig: Json): Either[String, Client[Id, Json]] =
private[spark] def getIgluClient(igluConfig: Json): Either[String, Client[Id, Json]] =
Client.parseDefault(igluConfig).leftMap(_.show).value
}

Expand Down
Loading

0 comments on commit 55178f4

Please sign in to comment.