Skip to content

Commit

Permalink
Transformer: cache result of flattening schema (close #1086)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored and pondzix committed Sep 30, 2022
1 parent 4940299 commit c5baacf
Show file tree
Hide file tree
Showing 13 changed files with 330 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._
import com.snowplowanalytics.snowplow.analytics.scalasdk.{ParsingError, Event}
import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor, Failure, Payload, FailureDetails}
import com.snowplowanalytics.snowplow.rdbloader.common.Common
import Flattening.{NullCharacter, getOrdered}
import Flattening.NullCharacter
import com.snowplowanalytics.iglu.schemaddl.Properties

object EventUtils {

Expand Down Expand Up @@ -138,8 +139,19 @@ object EventUtils {
* @param instance self-describing JSON that needs to be transformed
* @return list of columns or flattening error
*/
def flatten[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], instance: SelfDescribingData[Json]): EitherT[F, FailureDetails.LoaderIgluError, List[String]] =
getOrdered(resolver, instance.schema).map { ordered => FlatData.flatten(instance.data, ordered, getString, NullCharacter) }
def flatten[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F],
propertiesCache: PropertiesCache[F],
instance: SelfDescribingData[Json]): EitherT[F, FailureDetails.LoaderIgluError, List[String]] = {
Flattening.getDdlProperties(resolver, propertiesCache, instance.schema)
.map(props => mapProperties(props, instance))
}

private def mapProperties(props: Properties, instance: SelfDescribingData[Json]) = {
props
.map { case (pointer, _) =>
FlatData.getPath(pointer.forData, instance.data, getString, NullCharacter)
}
}

def getString(json: Json): String =
json.fold(NullCharacter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,16 @@ import io.circe.Json
import cats.Monad
import cats.data.EitherT
import cats.syntax.either._

import cats.effect.Clock

import com.snowplowanalytics.iglu.client.resolver.Resolver.{Cached, ListSchemasKey, NotCached}
import com.snowplowanalytics.iglu.core._
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.{Resolver, ClientError}

import com.snowplowanalytics.iglu.schemaddl.IgluSchema
import com.snowplowanalytics.iglu.schemaddl.migrations.{SchemaList => DdlSchemaList}
import com.snowplowanalytics.iglu.client.{ClientError, Resolver}
import com.snowplowanalytics.iglu.schemaddl.{IgluSchema, Properties}
import com.snowplowanalytics.iglu.schemaddl.migrations.{FlatSchema, SchemaList => DdlSchemaList}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._

import com.snowplowanalytics.snowplow.badrows.FailureDetails

object Flattening {
Expand All @@ -39,9 +35,6 @@ object Flattening {

val MetaSchema = SchemaKey("com.snowplowanalyics.self-desc", "schema", "jsonschema", SchemaVer.Full(1,0,0))

def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], key: SchemaKey): EitherT[F, FailureDetails.LoaderIgluError, DdlSchemaList] =
getOrdered(resolver, key.vendor, key.name, key.version.model)

def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], vendor: String, name: String, model: Int): EitherT[F, FailureDetails.LoaderIgluError, DdlSchemaList] = {
val criterion = SchemaCriterion(vendor, name, "jsonschema", Some(model), None, None)
val schemaList = resolver.listSchemas(vendor, name, model)
Expand All @@ -51,12 +44,47 @@ object Flattening {
} yield ordered
}

def getDdlProperties[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F],
propertiesCache: PropertiesCache[F],
schemaKey: SchemaKey): EitherT[F, FailureDetails.LoaderIgluError, Properties] = {
val criterion = SchemaCriterion(schemaKey.vendor, schemaKey.name, "jsonschema", Some(schemaKey.version.model), None, None)

EitherT(resolver.listSchemasResult(schemaKey.vendor, schemaKey.name, schemaKey.version.model))
.leftMap(error => FailureDetails.LoaderIgluError.SchemaListNotFound(criterion, error))
.flatMap {
case cached: Cached[ListSchemasKey, SchemaList] =>
lookupInCache(resolver, propertiesCache, cached)
case NotCached(schemaList) =>
evaluateProperties(schemaList, resolver)
}
}

def fetch[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F])(key: SchemaKey): EitherT[F, FailureDetails.LoaderIgluError, IgluSchema] =
for {
json <- EitherT(resolver.lookupSchema(key)).leftMap(error => FailureDetails.LoaderIgluError.IgluError(key, error))
schema <- EitherT.fromEither(parseSchema(json))
} yield schema

private def lookupInCache[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F],
propertiesCache: PropertiesCache[F],
resolvedSchemaList: Cached[ListSchemasKey, SchemaList]): EitherT[F, FailureDetails.LoaderIgluError, Properties] = {
val propertiesKey = (resolvedSchemaList.key, resolvedSchemaList.timestamp)

EitherT.liftF(propertiesCache.get(propertiesKey)).flatMap {
case Some(properties) =>
EitherT.pure[F, FailureDetails.LoaderIgluError](properties)
case None =>
evaluateProperties(resolvedSchemaList.value, resolver)
.semiflatTap(props => propertiesCache.put(propertiesKey, props))
}
}

private def evaluateProperties[F[_]: Monad: RegistryLookup: Clock](schemaList: SchemaList,
resolver: Resolver[F]): EitherT[F, FailureDetails.LoaderIgluError, Properties] = {
DdlSchemaList.fromSchemaList(schemaList, fetch(resolver))
.map(FlatSchema.extractProperties)
}

/** Parse JSON into self-describing schema, or return `FlatteningError` */
private def parseSchema(json: Json): Either[FailureDetails.LoaderIgluError, IgluSchema] =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,15 @@ object Transformed {
* @return either bad row (in case of failed flattening) or list of shredded entities inside original event
*/
def shredEvent[F[_]: Monad: RegistryLookup: Clock](igluClient: Client[F, CJson],
propertiesCache: PropertiesCache[F],
isTabular: SchemaKey => Boolean,
atomicLengths: Map[String, Int],
processor: Processor)
(event: Event): EitherT[F, BadRow, List[Transformed]] =
Hierarchy.fromEvent(event)
.traverse { hierarchy =>
val tabular = isTabular(hierarchy.entity.schema)
fromHierarchy(tabular, igluClient.resolver)(hierarchy)
fromHierarchy(tabular, igluClient.resolver, propertiesCache)(hierarchy)
}
.leftMap { error => EventUtils.shreddingBadRow(event, processor)(NonEmptyList.one(error)) }
.map { shredded =>
Expand All @@ -122,11 +123,14 @@ object Transformed {
* @param resolver Iglu resolver to request all necessary entities
* @param hierarchy actual JSON hierarchy from an enriched event
*/
private def fromHierarchy[F[_]: Monad: RegistryLookup: Clock](tabular: Boolean, resolver: => Resolver[F])(hierarchy: Hierarchy): EitherT[F, FailureDetails.LoaderIgluError, Transformed] = {
private def fromHierarchy[F[_]: Monad: RegistryLookup: Clock](tabular: Boolean,
resolver: => Resolver[F],
propertiesCache: PropertiesCache[F])
(hierarchy: Hierarchy): EitherT[F, FailureDetails.LoaderIgluError, Transformed] = {
val vendor = hierarchy.entity.schema.vendor
val name = hierarchy.entity.schema.name
if (tabular) {
EventUtils.flatten(resolver, hierarchy.entity).map { columns =>
EventUtils.flatten(resolver, propertiesCache, hierarchy.entity).map { columns =>
val meta = EventUtils.buildMetadata(hierarchy.eventId, hierarchy.collectorTstamp, hierarchy.entity.schema)
Shredded.Tabular(vendor, name, hierarchy.entity.schema.version.model, Transformed.Data.DString((meta ++ columns).mkString("\t")))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,27 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.common

import com.snowplowanalytics.iglu.client.resolver.Resolver.ListSchemasKey

import java.time.{Instant, ZoneOffset}
import java.time.format.DateTimeFormatter
import com.snowplowanalytics.lrumap.LruMap
import com.snowplowanalytics.iglu.schemaddl.Properties

package object transformation {

private val Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")

/**
* Used to cache properties for 'flatten' operation.
* Compound key with timestamps allows keeping Iglu and properties cache in sync.
* See more details in https://github.com/snowplow/snowplow-rdb-loader/issues/1086
* and 'CachedFlatteningSpec' test to see this in action.
*/
type SchemaListCachingTime = Int
type PropertiesKey = (ListSchemasKey, SchemaListCachingTime)
type PropertiesCache[F[_]] = LruMap[F, PropertiesKey, Properties]

implicit class InstantOps(time: Instant) {
def formatted: String = {
time.atOffset(ZoneOffset.UTC).format(Formatter)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package com.snowplowanalytics.snowplow.rdbloader.common

import cats.effect.Clock
import cats.{Id, Monad}
import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.{Registry, RegistryError, RegistryLookup}
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaList, SelfDescribingData}
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, PropertiesCache, PropertiesKey}
import io.circe.Json
import io.circe.literal.JsonStringContext
import org.specs2.mutable.Specification

import scala.concurrent.duration.{MILLISECONDS, NANOSECONDS, TimeUnit}

class CachedFlatteningSpec extends Specification {

//single 'field1' field
val `original schema - 1 field`: Json =
json"""
{
"$$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Test schema 1",
"self": {
"vendor": "com.snowplowanalytics.snowplow",
"name": "test_schema",
"format": "jsonschema",
"version": "1-0-0"
},
"type": "object",
"properties": {
"field1": { "type": "string"}
}
}
"""

//same key as schema1, but with additional `field2` field.
val `patched schema - 2 fields`: Json =
json"""
{
"$$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Test schema 1",
"self": {
"vendor": "com.snowplowanalytics.snowplow",
"name": "test_schema",
"format": "jsonschema",
"version": "1-0-0"
},
"type": "object",
"properties": {
"field1": { "type": "string" },
"field2": { "type": "integer" }
}
}
"""


val cacheTtl = 10 //seconds
val dataToFlatten = json"""{ "field1": "1", "field2": 2 }"""
val schemaKey = "iglu:com.snowplowanalytics.snowplow/test_schema/jsonschema/1-0-0"

// (vendor, name, model)
val propertiesKey = ("com.snowplowanalytics.snowplow", "test_schema", 1)

"Cached properties during flattening should be in sync with cached schemas/lists in iglu client" >> {

"(1) original schema only, 1 flatten call => 1 field flattened" in {
val propertiesCache = getCache
val result = flatten(propertiesCache, getResolver)(
currentTime = 1000, //ms
schemaInRegistry = `original schema - 1 field`
)

result must beEqualTo(List("1"))

//Properties are cached after first call (1 second)
propertiesCache.get((propertiesKey, 1)) must beSome
}

"(2) original schema is patched between calls, no delay => original schema is still cached => 1 field flattened" in {
val propertiesCache = getCache
val resolver = getResolver

//first call
flatten(propertiesCache, resolver)(
currentTime = 1000, //ms
schemaInRegistry = `original schema - 1 field`
)

//second call, same time
val result = flatten(propertiesCache, resolver)(
currentTime = 1000, //ms
schemaInRegistry = `patched schema - 2 fields` //different schema with the same key!
)

//no data from patched schema
result must beEqualTo(List("1"))

//Properties are cached after first call (1 second)
propertiesCache.get((propertiesKey, 1)) must beSome
}

"(3) schema is patched, delay between flatten calls is less than cache TTL => original schema is still cached => 1 field flattened" in {
val propertiesCache = getCache
val resolver = getResolver

//first call
flatten(propertiesCache, resolver)(
currentTime = 1000, //ms
schemaInRegistry = `original schema - 1 field`
)

//second call, 2s later, less than 10s TTL
val result = flatten(propertiesCache, resolver)(
currentTime = 3000, //ms
schemaInRegistry = `patched schema - 2 fields` //different schema with the same key!
)

//no data from patched schema
result must beEqualTo(List("1"))

//Properties are cached after first call (1 second)
propertiesCache.get((propertiesKey, 1)) must beSome

//Properties are not cached after second call (3 seconds)
propertiesCache.get((propertiesKey, 3)) must beNone
}

"(4) schema is patched, delay between flatten calls is greater than cache TTL => original schema is expired => using patched schema => 2 field flattened" in {
val propertiesCache = getCache
val resolver = getResolver

//first call
flatten(propertiesCache, resolver)(
currentTime = 1000, //ms
schemaInRegistry = `original schema - 1 field`
)

//second call, 12s later, greater than 10s TTL
val result = flatten(propertiesCache, resolver)(
currentTime = 13000, //ms
schemaInRegistry = `patched schema - 2 fields` //different schema with the same key!
)

//Cache content expired, patched schema is fetched => 2 fields flattened
result must beEqualTo(List("1", "2"))

//Properties are cached after first call (1 second)
propertiesCache.get((propertiesKey, 1)) must beSome

//Properties are cached after second call (13 seconds)
propertiesCache.get((propertiesKey, 13)) must beSome
}
}

//Helper method to wire all test dependencies and execute EventUtils.flatten
private def flatten(propertiesCache: PropertiesCache[Id], resolver: Resolver[Id])
(currentTime: Long,
schemaInRegistry: Json): List[String] = {

//To return value stored in the schemaInRegistry variable, passed registry is ignored
val testRegistryLookup: RegistryLookup[Id] = new RegistryLookup[Id] {
override def lookup(registry: Registry,
schemaKey: SchemaKey): Id[Either[RegistryError, Json]] =
Right(schemaInRegistry)

override def list(registry: Registry,
vendor: String,
name: String,
model: Int): Id[Either[RegistryError, SchemaList]] =
Right(SchemaList(List(SchemaKey.fromUri("iglu:com.snowplowanalytics.snowplow/test_schema/jsonschema/1-0-0").right.get)))
}

val staticClock: Clock[Id] = new Clock[Id] {
override def realTime(unit: TimeUnit): Id[Long] =
unit.convert(currentTime, MILLISECONDS)

override def monotonic(unit: TimeUnit): Id[Long] =
unit.convert(currentTime * 1000000, NANOSECONDS)
}

val data = SelfDescribingData(schema = SchemaKey.fromUri(schemaKey).right.get, data = dataToFlatten)

EventUtils.flatten(resolver, propertiesCache, data)(Monad[Id], testRegistryLookup, staticClock).value.right.get
}

private def getCache: PropertiesCache[Id] = CreateLruMap[Id, PropertiesKey, Properties].create(100)

private def getResolver: Resolver[Id] = {
Resolver.init[Id](
cacheSize = 10,
cacheTtl = Some(cacheTtl),
refs = Registry.Embedded( //not used in test as we fix returned schema in custom test RegistryLookup
Registry.Config("Test", 0, List.empty),
path = "/fake"
)
)
}
}
Loading

0 comments on commit c5baacf

Please sign in to comment.