Skip to content

Commit

Permalink
Take superseding schema into account during validation (close #751)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Mar 17, 2023
1 parent bb71d94 commit 5f71486
Show file tree
Hide file tree
Showing 20 changed files with 493 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,12 @@ class EventGenEtlPipelineSpec extends Specification with CatsIO {
.map(_.toJson(false))
.map(_.foldWith(folder))
.map(_.noSpaces)
.map(
_.replaceAll(
"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0",
"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1"
)
)
.map(decode[IntermediateEvent])
.rethrow
.map(IntermediateEvent.pad)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import cats.data.Validated.{Invalid, Valid}

import io.circe.Json
import io.circe.syntax._
import io.circe.parser.{parse => jparse}

import org.apache.thrift.TSerializer

Expand Down Expand Up @@ -116,10 +117,18 @@ object BlackBoxTesting extends Specification with CatsIO {

private def checkEnriched(enriched: EnrichedEvent, expectedFields: Map[String, String]) = {
val asMap = getMap(enriched)
val r = expectedFields.map { case (k, v) => asMap.get(k) must beSome(v) }
val r = expectedFields.map {
case (k, v) if k == "unstruct_event" || k == "contexts" || k == "derived_contexts" =>
compareJsons(asMap.getOrElse(k, ""), v) must beTrue
case (k, v) =>
asMap.get(k) must beSome(v)
}
r.toList.reduce(_ and _)
}

private def compareJsons(j1: String, j2: String): Boolean =
j1 == j2 || jparse(j1).toOption.get == jparse(j2).toOption.get

private val enrichedFields = classOf[EnrichedEvent].getDeclaredFields()
enrichedFields.foreach(_.setAccessible(true))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ApiRequestEnrichmentSpec extends Specification with CatsIO {
)
val expected = Map(
"unstruct_event" -> json"""{"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow-website/signup_form_submitted/jsonschema/1-0-0","data":{"name":"Bob®","email":"[email protected]","company":"SP","eventsPerMonth":"< 1 million","serviceType":"unsure"}}}""".noSpaces,
"contexts" -> json"""{"data":[{"data":{"osType":"OSX","appleIdfa":"some_appleIdfa","openIdfa":"some_Idfa","carrier":"some_carrier","deviceModel":"large","osVersion":"3.0.0","appleIdfa":"some_appleIdfa","androidIdfa":"some_androidIdfa","deviceManufacturer":"Amstrad"},"schema":"iglu:com.snowplowanalytics.snowplow/mobile_context/jsonschema/1-0-0"},{"data":{"longitude":10,"bearing":50,"speed":16,"altitude":20,"altitudeAccuracy":0.3,"latitudeLongitudeAccuracy":0.5,"latitude":7},"schema":"iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-0-0"}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0"}""".noSpaces,
"contexts" -> json"""{"data":[{"data":{"osType":"OSX","appleIdfa":"some_appleIdfa","openIdfa":"some_Idfa","carrier":"some_carrier","deviceModel":"large","osVersion":"3.0.0","appleIdfa":"some_appleIdfa","androidIdfa":"some_androidIdfa","deviceManufacturer":"Amstrad"},"schema":"iglu:com.snowplowanalytics.snowplow/mobile_context/jsonschema/1-0-0"},{"data":{"longitude":10,"bearing":50,"speed":16,"altitude":20,"altitudeAccuracy":0.3,"latitudeLongitudeAccuracy":0.5,"latitude":7},"schema":"iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-0-0"}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1"}""".noSpaces,
"derived_contexts" -> json"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:com.statusgator/status_change/jsonschema/1-0-0","data":{"serviceName": "sp-api-request-enrichment"}}]}""".noSpaces
)
BlackBoxTesting.runTest(input, expected, Some(ApiRequestEnrichmentSpec.conf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class TransactionSpec extends Specification with CatsIO {
"txn_id" -> "28288",
"tr_country" -> "UK",
"tr_city" -> "London",
"contexts" -> json"""{"data":[{"schema":"iglu:com.snowplowanalytics.snowplow/uri_redirect/jsonschema/1-0-0","data":{"uri":"http://snowplowanalytics.com/"}}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0"}""".noSpaces
"contexts" -> json"""{"data":[{"schema":"iglu:com.snowplowanalytics.snowplow/uri_redirect/jsonschema/1-0-0","data":{"uri":"http://snowplowanalytics.com/"}}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1"}""".noSpaces
)
BlackBoxTesting.runTest(input, expected)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,13 @@ object Tp2Adapter extends Adapter {
)
_ <- client
.check(sd)
.leftMap(e =>
NonEmptyList.one(
FailureDetails.TrackerProtocolViolation
.IgluError(sd.schema, e)
)
)
.leftMap({
case (e, _) =>
NonEmptyList.one(
FailureDetails.TrackerProtocolViolation
.IgluError(sd.schema, e)
)
})
.subflatMap { _ =>
schemaCriterion.matches(sd.schema) match {
case true => ().asRight
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ object EnrichmentManager {
unstructEvent,
featureFlags.legacyEnrichmentOrder
)
_ <- EitherT.rightT[F, BadRow] {
if (enrichmentsContexts.nonEmpty)
enriched.derived_contexts = ME.formatDerivedContexts(enrichmentsContexts)
}
_ = {
ME.formatUnstructEvent(unstructEvent).foreach(e => enriched.unstruct_event = e)
ME.formatContexts(inputContexts).foreach(c => enriched.contexts = c)
ME.formatContexts(enrichmentsContexts).foreach(c => enriched.derived_contexts = c)
}
_ <- IgluUtils
.validateEnrichmentsContexts[F](client, enrichmentsContexts, raw, processor, enriched)
_ <- EitherT.rightT[F, BadRow](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object EnrichmentRegistry {
)
_ <- client
.check(sd)
.leftMap(e => NonEmptyList.one(e.asJson.noSpaces))
.leftMap({ case (e, _) => NonEmptyList.one(e.asJson.noSpaces) })
.subflatMap { _ =>
EnrichmentConfigSchemaCriterion.matches(sd.schema) match {
case true => ().asRight
Expand All @@ -92,9 +92,10 @@ object EnrichmentRegistry {
)
_ <- client
.check(sd)
.leftMap(e =>
NonEmptyList.one(s"Enrichment with key '${sd.schema.toSchemaUri}` is invalid - ${e.asJson.noSpaces}")
)
.leftMap({
case (e, _) =>
NonEmptyList.one(s"Enrichment with key '${sd.schema.toSchemaUri}` is invalid - ${e.asJson.noSpaces}")
})
conf <- EitherT.fromEither[F](
buildEnrichmentConfig(sd.schema, sd.data, localMode).toEither
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ object MiscEnrichments {
val ContextsSchema =
SchemaKey("com.snowplowanalytics.snowplow", "contexts", "jsonschema", SchemaVer.Full(1, 0, 1))

val UnstructEventSchema =
SchemaKey("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", SchemaVer.Full(1, 0, 0))

/**
* The version of this ETL. Appends this version to the supplied "host" ETL.
* @param processor The version of the host ETL running this library
Expand Down Expand Up @@ -82,6 +85,11 @@ object MiscEnrichments {
}

/** Turn a list of custom contexts into a self-describing JSON property */
def formatDerivedContexts(derivedContexts: List[SelfDescribingData[Json]]): String =
SelfDescribingData(ContextsSchema, Json.arr(derivedContexts.map(_.normalize): _*)).asString
def formatContexts(contexts: List[SelfDescribingData[Json]]): Option[String] =
if (contexts.isEmpty) None
else Some(SelfDescribingData(ContextsSchema, Json.arr(contexts.map(_.normalize): _*)).asString)

/** Turn a unstruct event into a self-describing JSON property */
def formatUnstructEvent(unstructEvent: Option[SelfDescribingData[Json]]): Option[String] =
unstructEvent.map(e => SelfDescribingData(UnstructEventSchema, e.normalize).asString)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ import cats.data.{EitherT, NonEmptyList, Validated, ValidatedNel}
import cats.effect.Clock
import cats.implicits._

import io.circe.Json
import io.circe._
import io.circe.syntax._
import io.circe.generic.semiauto._

import java.time.Instant

import com.snowplowanalytics.iglu.client.{ClientError, IgluCirceClient}
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData}
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.badrows._
Expand Down Expand Up @@ -71,7 +73,11 @@ object IgluUtils {
.map(_.toValidatedNel)
} yield (contexts, unstruct)
.mapN { (c, ue) =>
(c, ue)
val supersedingInfoContexts = (c.flatMap(_.supersedingInfo) ::: ue.flatMap(_.supersedingInfo).toList).distinct
.map(_.toSdj)
val contexts = c.map(_.sdj) ::: supersedingInfoContexts
val unstruct = ue.map(_.sdj)
(contexts, unstruct)
}
.leftMap { schemaViolations =>
buildSchemaViolationsBadRow(
Expand All @@ -98,27 +104,17 @@ object IgluUtils {
client: IgluCirceClient[F],
field: String = "ue_properties",
criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", 1, 0)
): F[Validated[FailureDetails.SchemaViolation, Option[SelfDescribingData[Json]]]] =
): F[Validated[FailureDetails.SchemaViolation, Option[SdjExtractResult]]] =
(Option(enriched.unstruct_event) match {
case Some(rawUnstructEvent) =>
for {
// Validate input Json string and extract unstructured event
unstruct <- extractInputData(rawUnstructEvent, field, criterion, client)
// Parse Json unstructured event as SelfDescribingData[Json]
unstructSDJ <- SelfDescribingData
.parse(unstruct)
.leftMap(FailureDetails.SchemaViolation.NotIglu(unstruct, _))
.toEitherT[F]
// Check SelfDescribingData[Json] of unstructured event
_ <- check(client, unstructSDJ)
.leftMap {
case (schemaKey, clientError) =>
FailureDetails.SchemaViolation.IgluError(schemaKey, clientError)
}
.leftWiden[FailureDetails.SchemaViolation]
unstructSDJ <- parseAndValidateSDJ_sv(unstruct, client)
} yield unstructSDJ.some
case None =>
EitherT.rightT[F, FailureDetails.SchemaViolation](none[SelfDescribingData[Json]])
EitherT.rightT[F, FailureDetails.SchemaViolation](none[SdjExtractResult])
}).toValidated

/**
Expand All @@ -135,7 +131,7 @@ object IgluUtils {
client: IgluCirceClient[F],
field: String = "contexts",
criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "contexts", "jsonschema", 1, 0)
): F[ValidatedNel[FailureDetails.SchemaViolation, List[SelfDescribingData[Json]]]] =
): F[ValidatedNel[FailureDetails.SchemaViolation, List[SdjExtractResult]]] =
(Option(enriched.contexts) match {
case Some(rawContexts) =>
for {
Expand All @@ -153,7 +149,7 @@ object IgluUtils {
} yield contextsSDJ
case None =>
EitherT.rightT[F, NonEmptyList[FailureDetails.SchemaViolation]](
List.empty[SelfDescribingData[Json]]
List.empty[SdjExtractResult]
)
}).toValidated

Expand Down Expand Up @@ -238,10 +234,13 @@ object IgluUtils {
private def check[F[_]: Monad: RegistryLookup: Clock](
client: IgluCirceClient[F],
sdj: SelfDescribingData[Json]
): EitherT[F, (SchemaKey, ClientError), Unit] =
): EitherT[F, (SchemaKey, ClientError), Option[SchemaVer.Full]] =
client
.check(sdj)
.leftMap(clientErr => (sdj.schema, clientErr))
.leftMap({
case (e, None) => (sdj.schema, e)
case (e, Some(superseding)) => (sdj.schema.copy(version = superseding), e)
})

/** Check a list of SDJs and merge the Iglu errors */
private def checkList[F[_]: Monad: RegistryLookup: Clock](
Expand All @@ -259,20 +258,48 @@ object IgluUtils {
private def parseAndValidateSDJ_sv[F[_]: Monad: RegistryLookup: Clock]( // _sv for SchemaViolation
json: Json,
client: IgluCirceClient[F]
): EitherT[F, FailureDetails.SchemaViolation, SelfDescribingData[Json]] =
): EitherT[F, FailureDetails.SchemaViolation, SdjExtractResult] =
for {
sdj <- SelfDescribingData
.parse(json)
.leftMap(FailureDetails.SchemaViolation.NotIglu(json, _))
.toEitherT[F]
_ <- check(client, sdj)
.leftMap {
case (schemaKey, clientError) =>
FailureDetails.SchemaViolation
.IgluError(schemaKey, clientError): FailureDetails.SchemaViolation
supersedingSchema <- check(client, sdj)
.leftMap {
case (schemaKey, clientError) =>
FailureDetails.SchemaViolation
.IgluError(schemaKey, clientError): FailureDetails.SchemaViolation

}
} yield sdj
}
supersedingInfo = supersedingSchema.map(s => SchemaSupersedingInfo(sdj.schema, s))
sdjUpdated = replaceSchemaVersion(sdj, supersedingInfo)
} yield SdjExtractResult(sdjUpdated, supersedingInfo)

private def replaceSchemaVersion(
sdj: SelfDescribingData[Json],
supersedingInfo: Option[SchemaSupersedingInfo]
): SelfDescribingData[Json] =
supersedingInfo match {
case None => sdj
case Some(s) => sdj.copy(schema = sdj.schema.copy(version = s.supersededBy))
}

case class SchemaSupersedingInfo(originalSchema: SchemaKey, supersededBy: SchemaVer.Full) {
def toSdj: SelfDescribingData[Json] =
SelfDescribingData(SchemaSupersedingInfo.schemaKey, (this: SchemaSupersedingInfo).asJson)
}

object SchemaSupersedingInfo {
val schemaKey = SchemaKey("com.snowplowanalytics.iglu", "superseding_info", "jsonschema", SchemaVer.Full(1, 0, 0))

implicit val schemaVerFullEncoder: Encoder[SchemaVer.Full] =
Encoder.encodeString.contramap(v => v.asString)

implicit val schemaSupersedingInfoEncoder: Encoder[SchemaSupersedingInfo] =
deriveEncoder[SchemaSupersedingInfo]
}

case class SdjExtractResult(sdj: SelfDescribingData[Json], supersedingInfo: Option[SchemaSupersedingInfo])

/** Build `BadRow.SchemaViolations` from a list of `FailureDetails.SchemaViolation`s */
def buildSchemaViolationsBadRow(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Superseding schema example",
"self": {
"vendor": "com.acme",
"name": "superseding_schema",
"format": "jsonschema",
"version": "1-0-0",
"supersededBy": "1-0-1"
},
"type": "object",
"properties": {
"field_a": {
"type": "string"
},
"field_b": {
"type": "string"
},
"field_c": {
"type": "string"
}
},
"required": ["field_a", "field_b"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Superseding schema example",
"self": {
"vendor": "com.acme",
"name": "superseding_schema",
"format": "jsonschema",
"version": "1-0-1"
},
"type": "object",
"properties": {
"field_a": {
"type": "string"
},
"field_b": {
"type": "string"
},
"field_c": {
"type": "string"
},
"field_d": {
"type": "string"
}
},
"required": ["field_a", "field_b"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Superseding schema example",
"self": {
"vendor": "com.acme",
"name": "superseding_schema",
"format": "jsonschema",
"version": "2-0-0",
"supersededBy": "2-0-1"
},
"type": "object",
"properties": {
"field_e": {
"type": "string"
},
"field_f": {
"type": "string"
}
},
"required": ["field_e", "field_f"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Superseding schema example",
"self": {
"vendor": "com.acme",
"name": "superseding_schema",
"format": "jsonschema",
"version": "2-0-1"
},
"type": "object",
"properties": {
"field_e": {
"type": "string"
},
"field_f": {
"type": "string"
},
"field_g": {
"type": "string"
}
},
"required": ["field_e", "field_f"],
"additionalProperties": false
}
Loading

0 comments on commit 5f71486

Please sign in to comment.