From 5b7a700c2300fa8c4525e9138ae3017f54a3ae13 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Fri, 8 Jul 2022 08:58:45 +0100 Subject: [PATCH] Transformer: cache result of flattening schema (close #1086) --- .../common/transformation/EventUtils.scala | 18 +- .../common/transformation/Flattening.scala | 50 ++++- .../common/transformation/Transformed.scala | 10 +- .../common/transformation/package.scala | 12 ++ .../CachedFlatteningSpec.scala | 202 ++++++++++++++++++ .../snowplow/rdbloader/dsl/Iglu.scala | 13 +- .../transformer/batch/Transformer.scala | 2 +- .../transformer/batch/spark/singleton.scala | 27 ++- .../transformer/kinesis/Processing.scala | 2 +- .../transformer/kinesis/Resources.scala | 22 +- .../transformer/kinesis/Transformer.scala | 5 +- .../kinesis/sinks/TransformingSpec.scala | 8 +- project/Dependencies.scala | 2 +- 13 files changed, 328 insertions(+), 45 deletions(-) create mode 100644 modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CachedFlatteningSpec.scala diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/EventUtils.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/EventUtils.scala index 3f6acd39f..541759833 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/EventUtils.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/EventUtils.scala @@ -37,7 +37,8 @@ import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._ import com.snowplowanalytics.snowplow.analytics.scalasdk.{ParsingError, Event} import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor, Failure, Payload, FailureDetails} import com.snowplowanalytics.snowplow.rdbloader.common.Common -import Flattening.{NullCharacter, getOrdered} +import Flattening.NullCharacter +import com.snowplowanalytics.iglu.schemaddl.Properties object EventUtils { @@ -138,8 +139,19 @@ object EventUtils { * @param instance self-describing JSON that needs to be transformed * @return list of columns or flattening error */ - def flatten[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], instance: SelfDescribingData[Json]): EitherT[F, FailureDetails.LoaderIgluError, List[String]] = - getOrdered(resolver, instance.schema).map { ordered => FlatData.flatten(instance.data, ordered, getString, NullCharacter) } + def flatten[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], + propertiesCache: PropertiesCache[F], + instance: SelfDescribingData[Json]): EitherT[F, FailureDetails.LoaderIgluError, List[String]] = { + Flattening.getDdlProperties(resolver, propertiesCache, instance.schema) + .map(props => mapProperties(props, instance)) + } + + private def mapProperties(props: Properties, instance: SelfDescribingData[Json]) = { + props + .map { case (pointer, _) => + FlatData.getPath(pointer.forData, instance.data, getString, NullCharacter) + } + } def getString(json: Json): String = json.fold(NullCharacter, diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Flattening.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Flattening.scala index 92d5d0e7e..1dd4098ce 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Flattening.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Flattening.scala @@ -16,20 +16,16 @@ import io.circe.Json import cats.Monad import cats.data.EitherT import cats.syntax.either._ - import cats.effect.Clock - +import com.snowplowanalytics.iglu.client.resolver.Resolver.{Cached, ListSchemasKey, NotCached} import com.snowplowanalytics.iglu.core._ import com.snowplowanalytics.iglu.core.circe.implicits._ - import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup -import com.snowplowanalytics.iglu.client.{Resolver, ClientError} - -import com.snowplowanalytics.iglu.schemaddl.IgluSchema -import com.snowplowanalytics.iglu.schemaddl.migrations.{SchemaList => DdlSchemaList} +import com.snowplowanalytics.iglu.client.{ClientError, Resolver} +import com.snowplowanalytics.iglu.schemaddl.{IgluSchema, Properties} +import com.snowplowanalytics.iglu.schemaddl.migrations.{FlatSchema, SchemaList => DdlSchemaList} import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._ - import com.snowplowanalytics.snowplow.badrows.FailureDetails object Flattening { @@ -39,9 +35,6 @@ object Flattening { val MetaSchema = SchemaKey("com.snowplowanalyics.self-desc", "schema", "jsonschema", SchemaVer.Full(1,0,0)) - def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], key: SchemaKey): EitherT[F, FailureDetails.LoaderIgluError, DdlSchemaList] = - getOrdered(resolver, key.vendor, key.name, key.version.model) - def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], vendor: String, name: String, model: Int): EitherT[F, FailureDetails.LoaderIgluError, DdlSchemaList] = { val criterion = SchemaCriterion(vendor, name, "jsonschema", Some(model), None, None) val schemaList = resolver.listSchemas(vendor, name, model) @@ -51,12 +44,47 @@ object Flattening { } yield ordered } + def getDdlProperties[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], + propertiesCache: PropertiesCache[F], + schemaKey: SchemaKey): EitherT[F, FailureDetails.LoaderIgluError, Properties] = { + val criterion = SchemaCriterion(schemaKey.vendor, schemaKey.name, "jsonschema", Some(schemaKey.version.model), None, None) + + EitherT(resolver.listSchemasResult(schemaKey.vendor, schemaKey.name, schemaKey.version.model)) + .leftMap(error => FailureDetails.LoaderIgluError.SchemaListNotFound(criterion, error)) + .flatMap { + case cached: Cached[ListSchemasKey, SchemaList] => + lookupInCache(resolver, propertiesCache, cached) + case NotCached(schemaList) => + evaluateProperties(schemaList, resolver) + } + } + def fetch[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F])(key: SchemaKey): EitherT[F, FailureDetails.LoaderIgluError, IgluSchema] = for { json <- EitherT(resolver.lookupSchema(key)).leftMap(error => FailureDetails.LoaderIgluError.IgluError(key, error)) schema <- EitherT.fromEither(parseSchema(json)) } yield schema + private def lookupInCache[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], + propertiesCache: PropertiesCache[F], + resolvedSchemaList: Cached[ListSchemasKey, SchemaList]): EitherT[F, FailureDetails.LoaderIgluError, Properties] = { + val propertiesKey = (resolvedSchemaList.key, resolvedSchemaList.timestamp) + + EitherT.liftF(propertiesCache.get(propertiesKey)).flatMap { + case Some(properties) => + EitherT.pure[F, FailureDetails.LoaderIgluError](properties) + case None => + evaluateProperties(resolvedSchemaList.value, resolver) + .semiflatTap(props => propertiesCache.put(propertiesKey, props)) + } + } + + private def evaluateProperties[F[_]: Monad: RegistryLookup: Clock](schemaList: SchemaList, + resolver: Resolver[F]): EitherT[F, FailureDetails.LoaderIgluError, Properties] = { + DdlSchemaList.fromSchemaList(schemaList, fetch(resolver)) + .map(FlatSchema.extractProperties) + } + /** Parse JSON into self-describing schema, or return `FlatteningError` */ private def parseSchema(json: Json): Either[FailureDetails.LoaderIgluError, IgluSchema] = for { diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Transformed.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Transformed.scala index a84a06c10..ef3aa705f 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Transformed.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/Transformed.scala @@ -94,6 +94,7 @@ object Transformed { * @return either bad row (in case of failed flattening) or list of shredded entities inside original event */ def shredEvent[F[_]: Monad: RegistryLookup: Clock](igluClient: Client[F, CJson], + propertiesCache: PropertiesCache[F], isTabular: SchemaKey => Boolean, atomicLengths: Map[String, Int], processor: Processor) @@ -101,7 +102,7 @@ object Transformed { Hierarchy.fromEvent(event) .traverse { hierarchy => val tabular = isTabular(hierarchy.entity.schema) - fromHierarchy(tabular, igluClient.resolver)(hierarchy) + fromHierarchy(tabular, igluClient.resolver, propertiesCache)(hierarchy) } .leftMap { error => EventUtils.shreddingBadRow(event, processor)(NonEmptyList.one(error)) } .map { shredded => @@ -122,11 +123,14 @@ object Transformed { * @param resolver Iglu resolver to request all necessary entities * @param hierarchy actual JSON hierarchy from an enriched event */ - private def fromHierarchy[F[_]: Monad: RegistryLookup: Clock](tabular: Boolean, resolver: => Resolver[F])(hierarchy: Hierarchy): EitherT[F, FailureDetails.LoaderIgluError, Transformed] = { + private def fromHierarchy[F[_]: Monad: RegistryLookup: Clock](tabular: Boolean, + resolver: => Resolver[F], + propertiesCache: PropertiesCache[F]) + (hierarchy: Hierarchy): EitherT[F, FailureDetails.LoaderIgluError, Transformed] = { val vendor = hierarchy.entity.schema.vendor val name = hierarchy.entity.schema.name if (tabular) { - EventUtils.flatten(resolver, hierarchy.entity).map { columns => + EventUtils.flatten(resolver, propertiesCache, hierarchy.entity).map { columns => val meta = EventUtils.buildMetadata(hierarchy.eventId, hierarchy.collectorTstamp, hierarchy.entity.schema) Shredded.Tabular(vendor, name, hierarchy.entity.schema.version.model, Transformed.Data.DString((meta ++ columns).mkString("\t"))) } diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/package.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/package.scala index 54f1b5051..a06d0936d 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/package.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/transformation/package.scala @@ -14,13 +14,25 @@ */ package com.snowplowanalytics.snowplow.rdbloader.common +import com.snowplowanalytics.iglu.client.resolver.Resolver.ListSchemasKey + import java.time.{Instant, ZoneOffset} import java.time.format.DateTimeFormatter +import com.snowplowanalytics.lrumap.LruMap +import com.snowplowanalytics.iglu.schemaddl.Properties package object transformation { private val Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") + /** + * Using compound key with timestamps allows keeping Iglu and properties cache in sync. + * See more details in https://github.com/snowplow/snowplow-rdb-loader/issues/1086 + */ + type SchemaListCachingTime = Int + type PropertiesKey = (ListSchemasKey, SchemaListCachingTime) + type PropertiesCache[F[_]] = LruMap[F, PropertiesKey, Properties] + implicit class InstantOps(time: Instant) { def formatted: String = { time.atOffset(ZoneOffset.UTC).format(Formatter) diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CachedFlatteningSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CachedFlatteningSpec.scala new file mode 100644 index 000000000..6665aa2f4 --- /dev/null +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/CachedFlatteningSpec.scala @@ -0,0 +1,202 @@ +package com.snowplowanalytics.snowplow.rdbloader.common + +import cats.effect.Clock +import cats.{Id, Monad} +import com.snowplowanalytics.iglu.client.resolver.Resolver +import com.snowplowanalytics.iglu.client.resolver.registries.{Registry, RegistryError, RegistryLookup} +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaList, SelfDescribingData} +import com.snowplowanalytics.iglu.schemaddl.Properties +import com.snowplowanalytics.lrumap.CreateLruMap +import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, PropertiesCache, PropertiesKey} +import io.circe.Json +import io.circe.literal.JsonStringContext +import org.specs2.mutable.Specification + +import scala.concurrent.duration.{MILLISECONDS, NANOSECONDS, TimeUnit} + +class CachedFlatteningSpec extends Specification { + + //single 'field1' field + val `original schema - 1 field`: Json = + json""" + { + "$$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "description": "Test schema 1", + "self": { + "vendor": "com.snowplowanalytics.snowplow", + "name": "test_schema", + "format": "jsonschema", + "version": "1-0-0" + }, + + "type": "object", + "properties": { + "field1": { "type": "string"} + } + } + """ + + //same key as schema1, but with additional `field2` field. + val `patched schema - 2 fields`: Json = + json""" + { + "$$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "description": "Test schema 1", + "self": { + "vendor": "com.snowplowanalytics.snowplow", + "name": "test_schema", + "format": "jsonschema", + "version": "1-0-0" + }, + + "type": "object", + "properties": { + "field1": { "type": "string" }, + "field2": { "type": "integer" } + } + } + """ + + + val cacheTtl = 10 //seconds + val dataToFlatten = json"""{ "field1": "1", "field2": 2 }""" + val schemaKey = "iglu:com.snowplowanalytics.snowplow/test_schema/jsonschema/1-0-0" + + // (model, name, model) + val propertiesKey = ("com.snowplowanalytics.snowplow", "test_schema", 1) + + "Cached properties should be in sync with cached schemas/lists in iglu client" >> { + + "(1) original schema only, 1 flatten call => 1 field flattened" in { + val propertiesCache = getCache + val result = flatten(propertiesCache, getResolver)( + currentTime = 1000, //ms + schemaInRegistry = `original schema - 1 field` + ) + + result must beEqualTo(List("1")) + + //Properties are cached after first call (1 second) + propertiesCache.get((propertiesKey, 1)) must beSome + } + + "(2) original schema is patched between calls, no delay => original schema is still cached => 1 field flattened" in { + val propertiesCache = getCache + val resolver = getResolver + + //first call + flatten(propertiesCache, resolver)( + currentTime = 1000, //ms + schemaInRegistry = `original schema - 1 field` + ) + + //second call, same time + val result = flatten(propertiesCache, resolver)( + currentTime = 1000, //ms + schemaInRegistry = `patched schema - 2 fields` //different schema with the same key! + ) + + //no data from patched schema + result must beEqualTo(List("1")) + + //Properties are cached after first call (1 second) + propertiesCache.get((propertiesKey, 1)) must beSome + } + + "(3) schema is patched, delay between flatten calls is less than cache TTL => original schema is still cached => 1 field flattened" in { + val propertiesCache = getCache + val resolver = getResolver + + //first call + flatten(propertiesCache, resolver)( + currentTime = 1000, //ms + schemaInRegistry = `original schema - 1 field` + ) + + //second call, 2s later, less than 10s TTL + val result = flatten(propertiesCache, resolver)( + currentTime = 3000, //ms + schemaInRegistry = `patched schema - 2 fields` //different schema with the same key! + ) + + //no data from patched schema + result must beEqualTo(List("1")) + + //Properties are cached after first call (1 second) + propertiesCache.get((propertiesKey, 1)) must beSome + + //Properties are not cached after second call (3 seconds) + propertiesCache.get((propertiesKey, 3)) must beNone + } + + "(4) schema is patched, delay between flatten calls is greater than cache TTL => original schema is expired => using patched schema => 2 field flattened" in { + val propertiesCache = getCache + val resolver = getResolver + + //first call + flatten(propertiesCache, resolver)( + currentTime = 1000, //ms + schemaInRegistry = `original schema - 1 field` + ) + + //second call, 12s later, greater than 10s TTL + val result = flatten(propertiesCache, resolver)( + currentTime = 13000, //ms + schemaInRegistry = `patched schema - 2 fields` //different schema with the same key! + ) + + //Cache content expired, patched schema is fetched => 2 fields flattened + result must beEqualTo(List("1", "2")) + + //Properties are cached after first call (1 second) + propertiesCache.get((propertiesKey, 1)) must beSome + + //Properties are cached after second call (13 seconds) + propertiesCache.get((propertiesKey, 13)) must beSome + } + } + + //Helper method to wire all test dependencies and execute EventUtils.flatten + private def flatten(propertiesCache: PropertiesCache[Id], resolver: Resolver[Id]) + (currentTime: Long, + schemaInRegistry: Json): List[String] = { + + //To return value stored in the schemaInRegistry variable, passed registry is ignored + val testRegistryLookup: RegistryLookup[Id] = new RegistryLookup[Id] { + override def lookup(registry: Registry, + schemaKey: SchemaKey): Id[Either[RegistryError, Json]] = + Right(schemaInRegistry) + + override def list(registry: Registry, + vendor: String, + name: String, + model: Int): Id[Either[RegistryError, SchemaList]] = + Right(SchemaList(List(SchemaKey.fromUri("iglu:com.snowplowanalytics.snowplow/test_schema/jsonschema/1-0-0").right.get))) + } + + val staticClock: Clock[Id] = new Clock[Id] { + override def realTime(unit: TimeUnit): Id[Long] = + unit.convert(currentTime, MILLISECONDS) + + override def monotonic(unit: TimeUnit): Id[Long] = + unit.convert(currentTime * 1000000, NANOSECONDS) + } + + val data = SelfDescribingData(schema = SchemaKey.fromUri(schemaKey).right.get, data = dataToFlatten) + + EventUtils.flatten(resolver, propertiesCache, data)(Monad[Id], testRegistryLookup, staticClock).value.right.get + } + + private def getCache: PropertiesCache[Id] = CreateLruMap[Id, PropertiesKey, Properties].create(100) + + private def getResolver: Resolver[Id] = { + Resolver.init[Id]( + cacheSize = 10, + cacheTtl = Some(cacheTtl), + refs = Registry.Embedded( //not used in test as we fix returned schema in custom test RegistryLookup + Registry.Config("Test", 0, List.empty), + path = "/fake" + ) + ) + } +} diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Iglu.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Iglu.scala index bcc8e9dd4..85c3d1bb8 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Iglu.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Iglu.scala @@ -23,7 +23,6 @@ import io.circe.syntax._ import org.http4s.client.Client import com.snowplowanalytics.iglu.client.resolver.{Resolver, ResolverCache} -import com.snowplowanalytics.iglu.client.{Client => IgluClient} import com.snowplowanalytics.iglu.client.resolver.registries.{RegistryLookup, Http4sRegistryLookup} import com.snowplowanalytics.iglu.schemaddl.migrations.SchemaList @@ -51,11 +50,13 @@ object Iglu { Http4sRegistryLookup[F](httpClient) val buildResolver: F[Resolver[F]] = - IgluClient - .parseDefault[F](igluConfig) - .map(_.resolver.copy(cache = none[ResolverCache[F]])) // Disable cache to not re-fetch the stale state - .leftMap { decodingFailure => new IllegalArgumentException(s"Cannot initialize Iglu Resolver: ${decodingFailure.show}") } - .rethrowT + Resolver.parse[F](igluConfig) + .map { + _ + .map(_.copy(cache = none[ResolverCache[F]])) // Disable cache to not re-fetch the stale state + .leftMap { decodingFailure => new IllegalArgumentException(s"Cannot initialize Iglu Resolver: ${decodingFailure.show}") } + } + .rethrow Resource.eval[F, Resolver[F]](buildResolver).map[F, Iglu[F]] { resolver => (vendor: String, name: String, model: Int) => { diff --git a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/Transformer.scala b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/Transformer.scala index 37002c7da..d499d0c53 100644 --- a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/Transformer.scala +++ b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/Transformer.scala @@ -73,7 +73,7 @@ object Transformer { } def goodTransform(event: Event, eventsCounter: LongAccumulator): List[Transformed] = - Transformed.shredEvent[Id](IgluSingleton.get(igluConfig), isTabular, atomicLengths, ShredJob.BadRowsProcessor)(event).value match { + Transformed.shredEvent[Id](IgluSingleton.get(igluConfig), PropertiesCacheSingleton.get, isTabular, atomicLengths, ShredJob.BadRowsProcessor)(event).value match { case Right(shredded) => TypesAccumulator.recordType(typesAccumulator, TypesAccumulator.shreddedTypeConverter(findFormat))(event.inventory) timestampsAccumulator.add(event) diff --git a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/singleton.scala b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/singleton.scala index 129d498dd..c42414ac2 100644 --- a/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/singleton.scala +++ b/modules/transformer-batch/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/batch/spark/singleton.scala @@ -16,17 +16,17 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.batch.spark import cats.Id import cats.syntax.either._ -import cats.syntax.show._ import cats.syntax.option._ - -import io.circe.Json - +import cats.syntax.show._ import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials} import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder import com.snowplowanalytics.iglu.client.Client - -import com.snowplowanalytics.snowplow.eventsmanifest.{EventsManifestConfig, EventsManifest, DynamoDbManifest} +import com.snowplowanalytics.iglu.schemaddl.Properties +import com.snowplowanalytics.lrumap.CreateLruMap +import com.snowplowanalytics.snowplow.eventsmanifest.{DynamoDbManifest, EventsManifest, EventsManifestConfig} +import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, PropertiesKey} +import io.circe.Json /** Singletons needed for unserializable or stateful classes. */ object singleton { @@ -94,4 +94,19 @@ object singleton { instance } } + + object PropertiesCacheSingleton { + @volatile private var instance: PropertiesCache[Id] = _ + + def get: PropertiesCache[Id] = { + if (instance == null) { + synchronized { + if (instance == null) { + instance = CreateLruMap[Id, PropertiesKey, Properties].create(100) + } + } + } + instance + } + } } diff --git a/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/Processing.scala b/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/Processing.scala index 4e7a46d7d..330e51d21 100644 --- a/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/Processing.scala +++ b/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/Processing.scala @@ -75,7 +75,7 @@ object Processing { val transformer: Transformer[F] = config.formats match { case f: TransformerConfig.Formats.Shred => - Transformer.ShredTransformer(resources.iglu, f, resources.atomicLengths) + Transformer.ShredTransformer(resources.iglu, resources.propertiesCache, f, resources.atomicLengths) case f: TransformerConfig.Formats.WideRow => Transformer.WideRowTransformer(resources.iglu, f) } diff --git a/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/Resources.scala b/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/Resources.scala index ec66f3a70..fbf052830 100644 --- a/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/Resources.scala +++ b/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/Resources.scala @@ -27,14 +27,15 @@ import cats.effect.{Blocker, Clock, Concurrent, ConcurrentEffect, ContextShift, import blobstore.Store -import io.circe.Json +import com.snowplowanalytics.lrumap.CreateLruMap import com.snowplowanalytics.iglu.client.Client import com.snowplowanalytics.iglu.client.resolver.{InitListCache, InitSchemaCache, Resolver} +import com.snowplowanalytics.iglu.schemaddl.Properties import com.snowplowanalytics.aws.AWSQueue -import com.snowplowanalytics.snowplow.rdbloader.common.transformation.EventUtils +import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, PropertiesCache, PropertiesKey} import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.QueueConfig import com.snowplowanalytics.snowplow.rdbloader.transformer.metrics.Metrics @@ -45,6 +46,7 @@ import com.snowplowanalytics.snowplow.rdbloader.transformer.kinesis.generated.Bu case class Resources[F[_]]( iglu: Client[F, Json], + propertiesCache: PropertiesCache[F], atomicLengths: Map[String, Int], awsQueue: AWSQueue[F], instanceId: String, @@ -75,16 +77,18 @@ object Resources { executionContext: ExecutionContext ): Resource[F, Resources[F]] = for { - client <- mkClient(igluConfig) - atomicLengths <- mkAtomicFieldLengthLimit(client.resolver) - instanceId <- mkTransformerInstanceId - blocker <- Blocker[F] - store <- Resource.eval(mkStore[F](blocker, config.output.path)) - metrics <- Resource.eval(Metrics.build[F](blocker, config.monitoring.metrics)) - telemetry <- Telemetry.build[F](config, BuildInfo.name, BuildInfo.version, executionContext) + client <- mkClient(igluConfig) + propertiesCache <- Resource.eval(CreateLruMap[F, PropertiesKey, Properties].create(100)) + atomicLengths <- mkAtomicFieldLengthLimit(client.resolver) + instanceId <- mkTransformerInstanceId + blocker <- Blocker[F] + store <- Resource.eval(mkStore[F](blocker, config.output.path)) + metrics <- Resource.eval(Metrics.build[F](blocker, config.monitoring.metrics)) + telemetry <- Telemetry.build[F](config, BuildInfo.name, BuildInfo.version, executionContext) } yield Resources( client, + propertiesCache, atomicLengths, awsQueue, instanceId.toString, diff --git a/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/Transformer.scala b/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/Transformer.scala index 91a1ef1cd..6aace152f 100644 --- a/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/Transformer.scala +++ b/modules/transformer-kinesis/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/Transformer.scala @@ -25,7 +25,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.Common import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{SnowplowEntity, TypesInfo} import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Formats.WideRow -import com.snowplowanalytics.snowplow.rdbloader.common.transformation.Transformed +import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, Transformed} import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.{AtomicFieldsProvider, NonAtomicFieldsProvider, ParquetTransformer} import io.circe.Json @@ -41,6 +41,7 @@ sealed trait Transformer[F[_]] extends Product with Serializable { object Transformer { case class ShredTransformer[F[_]: Concurrent: Clock: Timer](iglu: Client[F, Json], + propertiesCache: PropertiesCache[F], formats: Formats.Shred, atomicLengths: Map[String, Int]) extends Transformer[F] { /** Check if `shredType` should be transformed into TSV */ @@ -53,7 +54,7 @@ object Transformer { } def goodTransform(event: Event): EitherT[F, BadRow, List[Transformed]] = - Transformed.shredEvent[F](iglu, isTabular, atomicLengths, Processing.Application)(event) + Transformed.shredEvent[F](iglu, propertiesCache, isTabular, atomicLengths, Processing.Application)(event) def badTransform(badRow: BadRow): Transformed = { val SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _)) = badRow.schemaKey diff --git a/modules/transformer-kinesis/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/sinks/TransformingSpec.scala b/modules/transformer-kinesis/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/sinks/TransformingSpec.scala index a905ebb00..b91cf84c5 100644 --- a/modules/transformer-kinesis/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/sinks/TransformingSpec.scala +++ b/modules/transformer-kinesis/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/kinesis/sinks/TransformingSpec.scala @@ -25,10 +25,12 @@ import io.circe.Json import io.circe.optics.JsonPath._ import io.circe.parser.{parse => parseCirce} import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.schemaddl.Properties +import com.snowplowanalytics.lrumap.CreateLruMap import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig -import com.snowplowanalytics.snowplow.rdbloader.common.transformation.Transformed +import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{PropertiesCache, PropertiesKey, Transformed} import com.snowplowanalytics.snowplow.rdbloader.transformer.kinesis.{Processing, Transformer} import com.snowplowanalytics.snowplow.rdbloader.transformer.kinesis.sources.{Checkpointer, ParsedC, file => FileSource} import org.specs2.mutable.Specification @@ -119,10 +121,12 @@ object TransformingSpec { val defaultWindow = Window(1, 1, 1, 1, 1) val dummyTransformedData = Transformed.Data.DString("") + def propertiesCache: PropertiesCache[IO] = CreateLruMap[IO, PropertiesKey, Properties].create(100).unsafeRunSync() + def createTransformer(formats: TransformerConfig.Formats): Transformer[IO] = formats match { case f: TransformerConfig.Formats.Shred => - Transformer.ShredTransformer(defaultIgluClient, f, defaultAtomicLengths) + Transformer.ShredTransformer(defaultIgluClient, propertiesCache, f, defaultAtomicLengths) case f: TransformerConfig.Formats.WideRow => Transformer.WideRowTransformer(defaultIgluClient, f) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index edba69617..451305b49 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,7 @@ object Dependencies { object V { // Scala (Loader) val decline = "2.1.0" - val igluClient = "1.1.1" + val igluClient = "1.2.0-M6" val igluCore = "1.0.0" val badrows = "2.1.0" val analyticsSdk = "3.0.1"