Skip to content

Commit

Permalink
Take superseding schema into account during validation (close #751)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Apr 18, 2023
1 parent d552d49 commit 273ea49
Show file tree
Hide file tree
Showing 19 changed files with 494 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,12 @@ class EventGenEtlPipelineSpec extends Specification with CatsIO {
.map(_.toJson(false))
.map(_.foldWith(folder))
.map(_.noSpaces)
.map(
_.replaceAll(
"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0",
"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1"
)
)
.map(decode[IntermediateEvent])
.rethrow
.map(IntermediateEvent.pad)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import cats.data.Validated.{Invalid, Valid}

import io.circe.Json
import io.circe.syntax._
import io.circe.parser.{parse => jparse}

import org.apache.thrift.TSerializer

Expand Down Expand Up @@ -116,10 +117,18 @@ object BlackBoxTesting extends Specification with CatsIO {

private def checkEnriched(enriched: EnrichedEvent, expectedFields: Map[String, String]) = {
val asMap = getMap(enriched)
val r = expectedFields.map { case (k, v) => asMap.get(k) must beSome(v) }
val r = expectedFields.map {
case (k, v) if k == "unstruct_event" || k == "contexts" || k == "derived_contexts" =>
compareJsons(asMap.getOrElse(k, ""), v) must beTrue
case (k, v) =>
asMap.get(k) must beSome(v)
}
r.toList.reduce(_ and _)
}

private def compareJsons(j1: String, j2: String): Boolean =
j1 == j2 || jparse(j1).toOption.get == jparse(j2).toOption.get

private val enrichedFields = classOf[EnrichedEvent].getDeclaredFields()
enrichedFields.foreach(_.setAccessible(true))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ApiRequestEnrichmentSpec extends Specification with CatsIO {
)
val expected = Map(
"unstruct_event" -> json"""{"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow-website/signup_form_submitted/jsonschema/1-0-0","data":{"name":"Bob®","email":"[email protected]","company":"SP","eventsPerMonth":"< 1 million","serviceType":"unsure"}}}""".noSpaces,
"contexts" -> json"""{"data":[{"data":{"osType":"OSX","appleIdfa":"some_appleIdfa","openIdfa":"some_Idfa","carrier":"some_carrier","deviceModel":"large","osVersion":"3.0.0","appleIdfa":"some_appleIdfa","androidIdfa":"some_androidIdfa","deviceManufacturer":"Amstrad"},"schema":"iglu:com.snowplowanalytics.snowplow/mobile_context/jsonschema/1-0-0"},{"data":{"longitude":10,"bearing":50,"speed":16,"altitude":20,"altitudeAccuracy":0.3,"latitudeLongitudeAccuracy":0.5,"latitude":7},"schema":"iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-0-0"}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0"}""".noSpaces,
"contexts" -> json"""{"data":[{"data":{"osType":"OSX","appleIdfa":"some_appleIdfa","openIdfa":"some_Idfa","carrier":"some_carrier","deviceModel":"large","osVersion":"3.0.0","appleIdfa":"some_appleIdfa","androidIdfa":"some_androidIdfa","deviceManufacturer":"Amstrad"},"schema":"iglu:com.snowplowanalytics.snowplow/mobile_context/jsonschema/1-0-0"},{"data":{"longitude":10,"bearing":50,"speed":16,"altitude":20,"altitudeAccuracy":0.3,"latitudeLongitudeAccuracy":0.5,"latitude":7},"schema":"iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-0-0"}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1"}""".noSpaces,
"derived_contexts" -> json"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:com.statusgator/status_change/jsonschema/1-0-0","data":{"serviceName": "sp-api-request-enrichment"}}]}""".noSpaces
)
BlackBoxTesting.runTest(input, expected, Some(ApiRequestEnrichmentSpec.conf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class TransactionSpec extends Specification with CatsIO {
"txn_id" -> "28288",
"tr_country" -> "UK",
"tr_city" -> "London",
"contexts" -> json"""{"data":[{"schema":"iglu:com.snowplowanalytics.snowplow/uri_redirect/jsonschema/1-0-0","data":{"uri":"http://snowplowanalytics.com/"}}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0"}""".noSpaces
"contexts" -> json"""{"data":[{"schema":"iglu:com.snowplowanalytics.snowplow/uri_redirect/jsonschema/1-0-0","data":{"uri":"http://snowplowanalytics.com/"}}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1"}""".noSpaces
)
BlackBoxTesting.runTest(input, expected)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ object EnrichmentManager {
unstructEvent,
featureFlags.legacyEnrichmentOrder
)
_ <- EitherT.rightT[F, BadRow] {
if (enrichmentsContexts.nonEmpty)
enriched.derived_contexts = ME.formatDerivedContexts(enrichmentsContexts)
}
_ = {
ME.formatUnstructEvent(unstructEvent).foreach(e => enriched.unstruct_event = e)
ME.formatContexts(inputContexts).foreach(c => enriched.contexts = c)
ME.formatContexts(enrichmentsContexts).foreach(c => enriched.derived_contexts = c)
}
_ <- IgluUtils
.validateEnrichmentsContexts[F](client, enrichmentsContexts, raw, processor, enriched)
_ <- EitherT.rightT[F, BadRow](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ object MiscEnrichments {
val ContextsSchema =
SchemaKey("com.snowplowanalytics.snowplow", "contexts", "jsonschema", SchemaVer.Full(1, 0, 1))

val UnstructEventSchema =
SchemaKey("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", SchemaVer.Full(1, 0, 0))

/**
* The version of this ETL. Appends this version to the supplied "host" ETL.
* @param processor The version of the host ETL running this library
Expand Down Expand Up @@ -82,6 +85,11 @@ object MiscEnrichments {
}

/** Turn a list of custom contexts into a self-describing JSON property */
def formatDerivedContexts(derivedContexts: List[SelfDescribingData[Json]]): String =
SelfDescribingData(ContextsSchema, Json.arr(derivedContexts.map(_.normalize): _*)).asString
def formatContexts(contexts: List[SelfDescribingData[Json]]): Option[String] =
if (contexts.isEmpty) None
else Some(SelfDescribingData(ContextsSchema, Json.arr(contexts.map(_.normalize): _*)).asString)

/** Turn a unstruct event into a self-describing JSON property */
def formatUnstructEvent(unstructEvent: Option[SelfDescribingData[Json]]): Option[String] =
unstructEvent.map(e => SelfDescribingData(UnstructEventSchema, e.normalize).asString)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ import cats.data.{EitherT, NonEmptyList, Validated, ValidatedNel}
import cats.effect.Clock
import cats.implicits._

import io.circe.Json
import io.circe._
import io.circe.syntax._
import io.circe.generic.semiauto._

import java.time.Instant

import com.snowplowanalytics.iglu.client.{ClientError, IgluCirceClient}
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData}
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.badrows._
Expand Down Expand Up @@ -71,7 +73,11 @@ object IgluUtils {
.map(_.toValidatedNel)
} yield (contexts, unstruct)
.mapN { (c, ue) =>
(c, ue)
val validationInfoContexts = (c.flatMap(_.validationInfo) ::: ue.flatMap(_.validationInfo).toList).distinct
.map(_.toSdj)
val contexts = c.map(_.sdj) ::: validationInfoContexts
val unstruct = ue.map(_.sdj)
(contexts, unstruct)
}
.leftMap { schemaViolations =>
buildSchemaViolationsBadRow(
Expand All @@ -98,27 +104,17 @@ object IgluUtils {
client: IgluCirceClient[F],
field: String = "ue_properties",
criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", 1, 0)
): F[Validated[FailureDetails.SchemaViolation, Option[SelfDescribingData[Json]]]] =
): F[Validated[FailureDetails.SchemaViolation, Option[SdjExtractResult]]] =
(Option(enriched.unstruct_event) match {
case Some(rawUnstructEvent) =>
for {
// Validate input Json string and extract unstructured event
unstruct <- extractInputData(rawUnstructEvent, field, criterion, client)
// Parse Json unstructured event as SelfDescribingData[Json]
unstructSDJ <- SelfDescribingData
.parse(unstruct)
.leftMap(FailureDetails.SchemaViolation.NotIglu(unstruct, _))
.toEitherT[F]
// Check SelfDescribingData[Json] of unstructured event
_ <- check(client, unstructSDJ)
.leftMap {
case (schemaKey, clientError) =>
FailureDetails.SchemaViolation.IgluError(schemaKey, clientError)
}
.leftWiden[FailureDetails.SchemaViolation]
unstructSDJ <- parseAndValidateSDJ_sv(unstruct, client)
} yield unstructSDJ.some
case None =>
EitherT.rightT[F, FailureDetails.SchemaViolation](none[SelfDescribingData[Json]])
EitherT.rightT[F, FailureDetails.SchemaViolation](none[SdjExtractResult])
}).toValidated

/**
Expand All @@ -135,7 +131,7 @@ object IgluUtils {
client: IgluCirceClient[F],
field: String = "contexts",
criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "contexts", "jsonschema", 1, 0)
): F[ValidatedNel[FailureDetails.SchemaViolation, List[SelfDescribingData[Json]]]] =
): F[ValidatedNel[FailureDetails.SchemaViolation, List[SdjExtractResult]]] =
(Option(enriched.contexts) match {
case Some(rawContexts) =>
for {
Expand All @@ -153,7 +149,7 @@ object IgluUtils {
} yield contextsSDJ
case None =>
EitherT.rightT[F, NonEmptyList[FailureDetails.SchemaViolation]](
List.empty[SelfDescribingData[Json]]
List.empty[SdjExtractResult]
)
}).toValidated

Expand Down Expand Up @@ -238,10 +234,10 @@ object IgluUtils {
private def check[F[_]: Monad: RegistryLookup: Clock](
client: IgluCirceClient[F],
sdj: SelfDescribingData[Json]
): EitherT[F, (SchemaKey, ClientError), Unit] =
): EitherT[F, (SchemaKey, ClientError), Option[SchemaVer.Full]] =
client
.check(sdj)
.leftMap(clientErr => (sdj.schema, clientErr))
.leftMap((sdj.schema, _))

/** Check a list of SDJs and merge the Iglu errors */
private def checkList[F[_]: Monad: RegistryLookup: Clock](
Expand All @@ -259,20 +255,48 @@ object IgluUtils {
private def parseAndValidateSDJ_sv[F[_]: Monad: RegistryLookup: Clock]( // _sv for SchemaViolation
json: Json,
client: IgluCirceClient[F]
): EitherT[F, FailureDetails.SchemaViolation, SelfDescribingData[Json]] =
): EitherT[F, FailureDetails.SchemaViolation, SdjExtractResult] =
for {
sdj <- SelfDescribingData
.parse(json)
.leftMap(FailureDetails.SchemaViolation.NotIglu(json, _))
.toEitherT[F]
_ <- check(client, sdj)
.leftMap {
case (schemaKey, clientError) =>
FailureDetails.SchemaViolation
.IgluError(schemaKey, clientError): FailureDetails.SchemaViolation
supersedingSchema <- check(client, sdj)
.leftMap {
case (schemaKey, clientError) =>
FailureDetails.SchemaViolation
.IgluError(schemaKey, clientError): FailureDetails.SchemaViolation

}
} yield sdj
}
validationInfo = supersedingSchema.map(s => ValidationInfo(sdj.schema, s))
sdjUpdated = replaceSchemaVersion(sdj, validationInfo)
} yield SdjExtractResult(sdjUpdated, validationInfo)

private def replaceSchemaVersion(
sdj: SelfDescribingData[Json],
validationInfo: Option[ValidationInfo]
): SelfDescribingData[Json] =
validationInfo match {
case None => sdj
case Some(s) => sdj.copy(schema = sdj.schema.copy(version = s.validatedWith))
}

case class ValidationInfo(originalSchema: SchemaKey, validatedWith: SchemaVer.Full) {
def toSdj: SelfDescribingData[Json] =
SelfDescribingData(ValidationInfo.schemaKey, (this: ValidationInfo).asJson)
}

object ValidationInfo {
val schemaKey = SchemaKey("com.snowplowanalytics.iglu", "validation_info", "jsonschema", SchemaVer.Full(1, 0, 0))

implicit val schemaVerFullEncoder: Encoder[SchemaVer.Full] =
Encoder.encodeString.contramap(v => v.asString)

implicit val validationInfoEncoder: Encoder[ValidationInfo] =
deriveEncoder[ValidationInfo]
}

case class SdjExtractResult(sdj: SelfDescribingData[Json], validationInfo: Option[ValidationInfo])

/** Build `BadRow.SchemaViolations` from a list of `FailureDetails.SchemaViolation`s */
def buildSchemaViolationsBadRow(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Superseding schema example",
"$supersededBy": "1-0-1",
"self": {
"vendor": "com.acme",
"name": "superseding_schema",
"format": "jsonschema",
"version": "1-0-0"
},
"type": "object",
"properties": {
"field_a": {
"type": "string"
},
"field_b": {
"type": "string"
},
"field_c": {
"type": "string"
}
},
"required": ["field_a", "field_b"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Superseding schema example",
"self": {
"vendor": "com.acme",
"name": "superseding_schema",
"format": "jsonschema",
"version": "1-0-1"
},
"type": "object",
"properties": {
"field_a": {
"type": "string"
},
"field_b": {
"type": "string"
},
"field_c": {
"type": "string"
},
"field_d": {
"type": "string"
}
},
"required": ["field_a", "field_b"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Superseding schema example",
"$supersededBy": "2-0-1",
"self": {
"vendor": "com.acme",
"name": "superseding_schema",
"format": "jsonschema",
"version": "2-0-0"
},
"type": "object",
"properties": {
"field_e": {
"type": "string"
},
"field_f": {
"type": "string"
}
},
"required": ["field_e", "field_f"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Superseding schema example",
"self": {
"vendor": "com.acme",
"name": "superseding_schema",
"format": "jsonschema",
"version": "2-0-1"
},
"type": "object",
"properties": {
"field_e": {
"type": "string"
},
"field_f": {
"type": "string"
},
"field_g": {
"type": "string"
}
},
"required": ["field_e", "field_f"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,8 @@ class SnowplowAdapterSpec extends Specification with DataTables with ValidatedMa
"required".some
)
)
)
),
None
)
)
)
Expand All @@ -353,7 +354,8 @@ class SnowplowAdapterSpec extends Specification with DataTables with ValidatedMa
"type".some
)
)
)
),
None
)
)
) |
Expand Down Expand Up @@ -381,7 +383,8 @@ class SnowplowAdapterSpec extends Specification with DataTables with ValidatedMa
"required".some
)
)
)
),
None
)
)
) |
Expand Down Expand Up @@ -422,7 +425,8 @@ class SnowplowAdapterSpec extends Specification with DataTables with ValidatedMa
"additionalProperties".some
)
)
)
),
None
)
)
) |> { (_, json, expected) =>
Expand Down
Loading

0 comments on commit 273ea49

Please sign in to comment.