Skip to content

Commit

Permalink
Transformer: cache result of flattening schema (close #1086)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored and spenes committed Nov 16, 2022
1 parent 1756e68 commit 756b4a7
Show file tree
Hide file tree
Showing 16 changed files with 389 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ object Processing {
): Stream[F, Unit] = {
val transformer: Transformer[F] = config.formats match {
case f: TransformerConfig.Formats.Shred =>
Transformer.ShredTransformer(resources.iglu, f, resources.atomicLengths, processor)
Transformer.ShredTransformer(resources.igluResolver, resources.propertiesCache, f, resources.atomicLengths, processor)
case f: TransformerConfig.Formats.WideRow =>
Transformer.WideRowTransformer(resources.iglu, f, processor)
Transformer.WideRowTransformer(resources.igluResolver, f, processor)
}

val messageProcessorVersion = Semver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ import cats.effect._

import org.http4s.client.blaze.BlazeClientBuilder

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.{InitListCache, InitSchemaCache, Resolver}

import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{Queue, BlobStorage}
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}
import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.EventUtils

import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, PropertiesCache, PropertiesKey}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.metrics.Metrics
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sources.Checkpointer

case class Resources[F[_], C](
iglu: Client[F, Json],
igluResolver: Resolver[F],
propertiesCache: PropertiesCache[F],
atomicLengths: Map[String, Int],
producer: Queue.Producer[F],
instanceId: String,
Expand Down Expand Up @@ -68,14 +69,16 @@ object Resources {
checkpointer: Queue.Consumer.Message[F] => C
): Resource[F, Resources[F, C]] =
for {
producer <- mkQueue(config.queue)
client <- mkClient(igluConfig)
atomicLengths <- mkAtomicFieldLengthLimit(client.resolver)
instanceId <- mkTransformerInstanceId
blocker <- Blocker[F]
metrics <- Resource.eval(Metrics.build[F](blocker, config.monitoring.metrics))
httpClient <- BlazeClientBuilder[F](executionContext).resource
telemetry <- Telemetry.build[F](
producer <- mkQueue(config.queue)
resolverConfig <- mkResolverConfig(igluConfig)
resolver <- mkResolver(resolverConfig)
propertiesCache <- Resource.eval(CreateLruMap[F, PropertiesKey, Properties].create(resolverConfig.cacheSize))
atomicLengths <- mkAtomicFieldLengthLimit(resolver)
instanceId <- mkTransformerInstanceId
blocker <- Blocker[F]
metrics <- Resource.eval(Metrics.build[F](blocker, config.monitoring.metrics))
httpClient <- BlazeClientBuilder[F](executionContext).resource
telemetry <- Telemetry.build[F](
config.telemetry,
buildName,
buildVersion,
Expand All @@ -88,7 +91,8 @@ object Resources {
blobStorage <- mkSink(blocker, config.output)
} yield
Resources(
client,
resolver,
propertiesCache,
atomicLengths,
producer,
instanceId.toString,
Expand All @@ -100,14 +104,20 @@ object Resources {
blobStorage
)

private def mkClient[F[_]: Sync: InitSchemaCache: InitListCache](igluConfig: Json): Resource[F, Client[F, Json]] = Resource.eval {
Client
.parseDefault[F](igluConfig)
.leftMap(e => new RuntimeException(s"Error while parsing Iglu config: ${e.getMessage()}"))
private def mkResolverConfig[F[_]: Sync](igluConfig: Json): Resource[F, ResolverConfig] = Resource.eval {
Resolver.parseConfig(igluConfig) match {
case Right(resolverConfig) => Sync[F].pure(resolverConfig)
case Left(error) => Sync[F].raiseError[ResolverConfig](new RuntimeException(s"Error while parsing Iglu resolver config: ${error.getMessage()}"))
}
}

private def mkResolver[F[_]: Sync: InitSchemaCache: InitListCache](resolverConfig: ResolverConfig): Resource[F, Resolver[F]] = Resource.eval {
Resolver.fromConfig[F](resolverConfig)
.leftMap(e => new RuntimeException(s"Error while parsing Iglu resolver config: ${e.getMessage()}"))
.value
.flatMap {
case Right(init) => Sync[F].pure(init)
case Left(error) => Sync[F].raiseError[Client[F, Json]](error)
case Left(error) => Sync[F].raiseError[Resolver[F]](error)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common
import cats.Monad
import cats.data.{EitherT, NonEmptyList}
import cats.effect._
import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}
import com.snowplowanalytics.snowplow.analytics.scalasdk.{Data, Event}
Expand All @@ -25,10 +25,9 @@ import com.snowplowanalytics.snowplow.rdbloader.common.Common
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{SnowplowEntity, TypesInfo}
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats.WideRow
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.Transformed
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.{AtomicFieldsProvider, NonAtomicFieldsProvider, ParquetTransformer}
import io.circe.Json

/**
* Includes common operations needed during event transformation
Expand All @@ -40,7 +39,8 @@ sealed trait Transformer[F[_]] extends Product with Serializable {
}

object Transformer {
case class ShredTransformer[F[_]: Concurrent: Clock: Timer](iglu: Client[F, Json],
case class ShredTransformer[F[_]: Concurrent: Clock: Timer](igluResolver: Resolver[F],
propertiesCache: PropertiesCache[F],
formats: Formats.Shred,
atomicLengths: Map[String, Int],
processor: Processor) extends Transformer[F] {
Expand All @@ -54,7 +54,7 @@ object Transformer {
}

def goodTransform(event: Event): EitherT[F, BadRow, List[Transformed]] =
Transformed.shredEvent[F](iglu, isTabular, atomicLengths, processor)(event)
Transformed.shredEvent[F](igluResolver, propertiesCache, isTabular, atomicLengths, processor)(event)

def badTransform(badRow: BadRow): Transformed = {
val SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)) = badRow.schemaKey
Expand All @@ -71,9 +71,9 @@ object Transformer {
}
}

case class WideRowTransformer[F[_]: Monad: RegistryLookup: Clock](iglu: Client[F, Json],
case class WideRowTransformer[F[_]: Monad: RegistryLookup: Clock](igluResolver: Resolver[F],
format: Formats.WideRow,
processor: Processor) extends Transformer[F] {
processor: Processor) extends Transformer[F] {
def goodTransform(event: Event): EitherT[F, BadRow, List[Transformed]] = {
val result = format match {
case WideRow.JSON =>
Expand Down Expand Up @@ -105,7 +105,7 @@ object Transformer {
val allTypesFromEvent = event.inventory.map(TypesInfo.WideRow.Type.from)

NonAtomicFieldsProvider
.build(iglu.resolver, allTypesFromEvent.toList)
.build(igluResolver, allTypesFromEvent.toList)
.leftMap { error => igluBadRow(event, error) }
.flatMap {
nonAtomicFields =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object ParquetSink {
private def createSchemaFromTypes[F[_] : Concurrent : ContextShift : Timer, C](resources: Resources[F, C],
types: List[Data.ShreddedType]): EitherT[F, FailureDetails.LoaderIgluError, MessageType] = {
for {
nonAtomic <- NonAtomicFieldsProvider.build[F](resources.iglu.resolver, types.map(WideRow.Type.from))
nonAtomic <- NonAtomicFieldsProvider.build[F](resources.igluResolver, types.map(WideRow.Type.from))
allFields = AllFields(AtomicFieldsProvider.static, nonAtomic)
} yield ParquetSchema.build(allFields)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ import fs2.{Stream, text}
import io.circe.Json
import io.circe.optics.JsonPath._
import io.circe.parser.{parse => parseCirce}
import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.Registry
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.Transformed
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, PropertiesKey, Transformed}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.{Processing, Transformer}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sources.{Checkpointer, ParsedC}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.TestApplication.TestProcessor
Expand Down Expand Up @@ -112,20 +115,22 @@ object TransformingSpec {
val BadPathPrefix = "output=bad"
val DefaultTimestamp = "2020-09-29T10:38:56.653Z"

val defaultIgluClient = Client.IgluCentral
val defaultIgluResolver: Resolver[IO] = Resolver(List(Registry.IgluCentral), None)
val defaultAtomicLengths: Map[String, Int] = Map.empty
val wideRowFormat = TransformerConfig.Formats.WideRow.JSON
val shredFormat = TransformerConfig.Formats.Shred(LoaderMessage.TypesInfo.Shredded.ShreddedFormat.TSV, List.empty, List.empty, List.empty)
val testBlocker = Blocker.liftExecutionContext(concurrent.ExecutionContext.global)
val defaultWindow = Window(1, 1, 1, 1, 1)
val dummyTransformedData = Transformed.Data.DString("")

def propertiesCache: PropertiesCache[IO] = CreateLruMap[IO, PropertiesKey, Properties].create(100).unsafeRunSync()

def createTransformer(formats: TransformerConfig.Formats): Transformer[IO] =
formats match {
case f: TransformerConfig.Formats.Shred =>
Transformer.ShredTransformer(defaultIgluClient, f, defaultAtomicLengths, TestProcessor)
Transformer.ShredTransformer(defaultIgluResolver, propertiesCache, f, defaultAtomicLengths, TestProcessor)
case f: TransformerConfig.Formats.WideRow =>
Transformer.WideRowTransformer(defaultIgluClient, f, TestProcessor)
Transformer.WideRowTransformer(defaultIgluResolver, f, TestProcessor)
}

def transformTestEvents(resourcePath: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._
import com.snowplowanalytics.snowplow.analytics.scalasdk.{ParsingError, Event}
import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor, Failure, Payload, FailureDetails}
import com.snowplowanalytics.snowplow.rdbloader.common.Common
import Flattening.{NullCharacter, getOrdered}
import Flattening.NullCharacter
import com.snowplowanalytics.iglu.schemaddl.Properties

object EventUtils {

Expand Down Expand Up @@ -138,8 +139,19 @@ object EventUtils {
* @param instance self-describing JSON that needs to be transformed
* @return list of columns or flattening error
*/
def flatten[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], instance: SelfDescribingData[Json]): EitherT[F, FailureDetails.LoaderIgluError, List[String]] =
getOrdered(resolver, instance.schema).map { ordered => FlatData.flatten(instance.data, ordered, getString, NullCharacter) }
def flatten[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F],
propertiesCache: PropertiesCache[F],
instance: SelfDescribingData[Json]): EitherT[F, FailureDetails.LoaderIgluError, List[String]] = {
Flattening.getDdlProperties(resolver, propertiesCache, instance.schema)
.map(props => mapProperties(props, instance))
}

private def mapProperties(props: Properties, instance: SelfDescribingData[Json]) = {
props
.map { case (pointer, _) =>
FlatData.getPath(pointer.forData, instance.data, getString, NullCharacter)
}
}

def getString(json: Json): String =
json.fold(NullCharacter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,16 @@ import io.circe.Json
import cats.Monad
import cats.data.EitherT
import cats.syntax.either._

import cats.effect.Clock

import com.snowplowanalytics.iglu.client.resolver.Resolver.{ResolverResult, SchemaListKey}
import com.snowplowanalytics.iglu.core._
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.{Resolver, ClientError}

import com.snowplowanalytics.iglu.schemaddl.IgluSchema
import com.snowplowanalytics.iglu.schemaddl.migrations.{SchemaList => DdlSchemaList}
import com.snowplowanalytics.iglu.client.{ClientError, Resolver}
import com.snowplowanalytics.iglu.schemaddl.{IgluSchema, Properties}
import com.snowplowanalytics.iglu.schemaddl.migrations.{FlatSchema, SchemaList => DdlSchemaList}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._

import com.snowplowanalytics.snowplow.badrows.FailureDetails

object Flattening {
Expand All @@ -39,9 +35,6 @@ object Flattening {

val MetaSchema = SchemaKey("com.snowplowanalyics.self-desc", "schema", "jsonschema", SchemaVer.Full(1,0,0))

def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], key: SchemaKey): EitherT[F, FailureDetails.LoaderIgluError, DdlSchemaList] =
getOrdered(resolver, key.vendor, key.name, key.version.model)

def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], vendor: String, name: String, model: Int): EitherT[F, FailureDetails.LoaderIgluError, DdlSchemaList] = {
val criterion = SchemaCriterion(vendor, name, "jsonschema", Some(model), None, None)
val schemaList = resolver.listSchemas(vendor, name, model)
Expand All @@ -51,12 +44,47 @@ object Flattening {
} yield ordered
}

def getDdlProperties[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F],
propertiesCache: PropertiesCache[F],
schemaKey: SchemaKey): EitherT[F, FailureDetails.LoaderIgluError, Properties] = {
val criterion = SchemaCriterion(schemaKey.vendor, schemaKey.name, "jsonschema", Some(schemaKey.version.model), None, None)

EitherT(resolver.listSchemasResult(schemaKey.vendor, schemaKey.name, schemaKey.version.model))
.leftMap(error => FailureDetails.LoaderIgluError.SchemaListNotFound(criterion, error))
.flatMap {
case cached: ResolverResult.Cached[SchemaListKey, SchemaList] =>
lookupInCache(resolver, propertiesCache, cached)
case ResolverResult.NotCached(schemaList) =>
evaluateProperties(schemaList, resolver)
}
}

def fetch[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F])(key: SchemaKey): EitherT[F, FailureDetails.LoaderIgluError, IgluSchema] =
for {
json <- EitherT(resolver.lookupSchema(key)).leftMap(error => FailureDetails.LoaderIgluError.IgluError(key, error))
schema <- EitherT.fromEither(parseSchema(json))
} yield schema

private def lookupInCache[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F],
propertiesCache: PropertiesCache[F],
resolvedSchemaList: ResolverResult.Cached[SchemaListKey, SchemaList]): EitherT[F, FailureDetails.LoaderIgluError, Properties] = {
val propertiesKey = (resolvedSchemaList.key, resolvedSchemaList.timestamp)

EitherT.liftF(propertiesCache.get(propertiesKey)).flatMap {
case Some(properties) =>
EitherT.pure[F, FailureDetails.LoaderIgluError](properties)
case None =>
evaluateProperties(resolvedSchemaList.value, resolver)
.semiflatTap(props => propertiesCache.put(propertiesKey, props))
}
}

private def evaluateProperties[F[_]: Monad: RegistryLookup: Clock](schemaList: SchemaList,
resolver: Resolver[F]): EitherT[F, FailureDetails.LoaderIgluError, Properties] = {
DdlSchemaList.fromSchemaList(schemaList, fetch(resolver))
.map(FlatSchema.extractProperties)
}

/** Parse JSON into self-describing schema, or return `FlatteningError` */
private def parseSchema(json: Json): Either[FailureDetails.LoaderIgluError, IgluSchema] =
for {
Expand Down
Loading

0 comments on commit 756b4a7

Please sign in to comment.