diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EventGenEtlPipelineSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EventGenEtlPipelineSpec.scala index 4b2210061..842434b02 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EventGenEtlPipelineSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EventGenEtlPipelineSpec.scala @@ -289,6 +289,12 @@ class EventGenEtlPipelineSpec extends Specification with CatsIO { .map(_.toJson(false)) .map(_.foldWith(folder)) .map(_.noSpaces) + .map( + _.replaceAll( + "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0", + "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1" + ) + ) .map(decode[IntermediateEvent]) .rethrow .map(IntermediateEvent.pad) diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/BlackBoxTesting.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/BlackBoxTesting.scala index b96d2eaf6..6e04efd2b 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/BlackBoxTesting.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/BlackBoxTesting.scala @@ -27,6 +27,7 @@ import cats.data.Validated.{Invalid, Valid} import io.circe.Json import io.circe.syntax._ +import io.circe.parser.{parse => jparse} import org.apache.thrift.TSerializer @@ -116,10 +117,18 @@ object BlackBoxTesting extends Specification with CatsIO { private def checkEnriched(enriched: EnrichedEvent, expectedFields: Map[String, String]) = { val asMap = getMap(enriched) - val r = expectedFields.map { case (k, v) => asMap.get(k) must beSome(v) } + val r = expectedFields.map { + case (k, v) if k == "unstruct_event" || k == "contexts" || k == "derived_contexts" => + compareJsons(asMap.getOrElse(k, ""), v) must beTrue + case (k, v) => + asMap.get(k) must beSome(v) + } r.toList.reduce(_ and _) } + private def compareJsons(j1: String, j2: String): Boolean = + j1 == j2 || jparse(j1).toOption.get == jparse(j2).toOption.get + private val enrichedFields = classOf[EnrichedEvent].getDeclaredFields() enrichedFields.foreach(_.setAccessible(true)) diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/enrichments/ApiRequestEnrichmentSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/enrichments/ApiRequestEnrichmentSpec.scala index 2880580f3..640a2ae91 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/enrichments/ApiRequestEnrichmentSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/enrichments/ApiRequestEnrichmentSpec.scala @@ -40,7 +40,7 @@ class ApiRequestEnrichmentSpec extends Specification with CatsIO { ) val expected = Map( "unstruct_event" -> json"""{"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow-website/signup_form_submitted/jsonschema/1-0-0","data":{"name":"BobĀ®","email":"alex+test@snowplowanalytics.com","company":"SP","eventsPerMonth":"< 1 million","serviceType":"unsure"}}}""".noSpaces, - "contexts" -> json"""{"data":[{"data":{"osType":"OSX","appleIdfa":"some_appleIdfa","openIdfa":"some_Idfa","carrier":"some_carrier","deviceModel":"large","osVersion":"3.0.0","appleIdfa":"some_appleIdfa","androidIdfa":"some_androidIdfa","deviceManufacturer":"Amstrad"},"schema":"iglu:com.snowplowanalytics.snowplow/mobile_context/jsonschema/1-0-0"},{"data":{"longitude":10,"bearing":50,"speed":16,"altitude":20,"altitudeAccuracy":0.3,"latitudeLongitudeAccuracy":0.5,"latitude":7},"schema":"iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-0-0"}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0"}""".noSpaces, + "contexts" -> json"""{"data":[{"data":{"osType":"OSX","appleIdfa":"some_appleIdfa","openIdfa":"some_Idfa","carrier":"some_carrier","deviceModel":"large","osVersion":"3.0.0","appleIdfa":"some_appleIdfa","androidIdfa":"some_androidIdfa","deviceManufacturer":"Amstrad"},"schema":"iglu:com.snowplowanalytics.snowplow/mobile_context/jsonschema/1-0-0"},{"data":{"longitude":10,"bearing":50,"speed":16,"altitude":20,"altitudeAccuracy":0.3,"latitudeLongitudeAccuracy":0.5,"latitude":7},"schema":"iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-0-0"}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1"}""".noSpaces, "derived_contexts" -> json"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:com.statusgator/status_change/jsonschema/1-0-0","data":{"serviceName": "sp-api-request-enrichment"}}]}""".noSpaces ) BlackBoxTesting.runTest(input, expected, Some(ApiRequestEnrichmentSpec.conf)) diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/misc/TransactionSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/misc/TransactionSpec.scala index be55a5f4b..7f23f3ff9 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/misc/TransactionSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/blackbox/misc/TransactionSpec.scala @@ -49,7 +49,7 @@ class TransactionSpec extends Specification with CatsIO { "txn_id" -> "28288", "tr_country" -> "UK", "tr_city" -> "London", - "contexts" -> json"""{"data":[{"schema":"iglu:com.snowplowanalytics.snowplow/uri_redirect/jsonschema/1-0-0","data":{"uri":"http://snowplowanalytics.com/"}}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0"}""".noSpaces + "contexts" -> json"""{"data":[{"schema":"iglu:com.snowplowanalytics.snowplow/uri_redirect/jsonschema/1-0-0","data":{"uri":"http://snowplowanalytics.com/"}}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1"}""".noSpaces ) BlackBoxTesting.runTest(input, expected) } diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/Tp2Adapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/Tp2Adapter.scala index 94db6064f..a9af72dfc 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/Tp2Adapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/snowplow/Tp2Adapter.scala @@ -234,12 +234,13 @@ object Tp2Adapter extends Adapter { ) _ <- client .check(sd) - .leftMap(e => - NonEmptyList.one( - FailureDetails.TrackerProtocolViolation - .IgluError(sd.schema, e) - ) - ) + .leftMap({ + case (e, _) => + NonEmptyList.one( + FailureDetails.TrackerProtocolViolation + .IgluError(sd.schema, e) + ) + }) .subflatMap { _ => schemaCriterion.matches(sd.schema) match { case true => ().asRight diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala index 4184acc66..e678eaa21 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala @@ -80,10 +80,11 @@ object EnrichmentManager { unstructEvent, featureFlags.legacyEnrichmentOrder ) - _ <- EitherT.rightT[F, BadRow] { - if (enrichmentsContexts.nonEmpty) - enriched.derived_contexts = ME.formatDerivedContexts(enrichmentsContexts) - } + _ = { + ME.formatUnstructEvent(unstructEvent).foreach(e => enriched.unstruct_event = e) + ME.formatContexts(inputContexts).foreach(c => enriched.contexts = c) + ME.formatContexts(enrichmentsContexts).foreach(c => enriched.derived_contexts = c) + } _ <- IgluUtils .validateEnrichmentsContexts[F](client, enrichmentsContexts, raw, processor, enriched) _ <- EitherT.rightT[F, BadRow]( diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala index b29489e05..5dad49e53 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala @@ -65,7 +65,7 @@ object EnrichmentRegistry { ) _ <- client .check(sd) - .leftMap(e => NonEmptyList.one(e.asJson.noSpaces)) + .leftMap({ case (e, _) => NonEmptyList.one(e.asJson.noSpaces) }) .subflatMap { _ => EnrichmentConfigSchemaCriterion.matches(sd.schema) match { case true => ().asRight @@ -92,9 +92,10 @@ object EnrichmentRegistry { ) _ <- client .check(sd) - .leftMap(e => - NonEmptyList.one(s"Enrichment with key '${sd.schema.toSchemaUri}` is invalid - ${e.asJson.noSpaces}") - ) + .leftMap({ + case (e, _) => + NonEmptyList.one(s"Enrichment with key '${sd.schema.toSchemaUri}` is invalid - ${e.asJson.noSpaces}") + }) conf <- EitherT.fromEither[F]( buildEnrichmentConfig(sd.schema, sd.data, localMode).toEither ) diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/MiscEnrichments.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/MiscEnrichments.scala index b98e45f22..15baaff23 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/MiscEnrichments.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/MiscEnrichments.scala @@ -30,6 +30,9 @@ object MiscEnrichments { val ContextsSchema = SchemaKey("com.snowplowanalytics.snowplow", "contexts", "jsonschema", SchemaVer.Full(1, 0, 1)) + val UnstructEventSchema = + SchemaKey("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", SchemaVer.Full(1, 0, 0)) + /** * The version of this ETL. Appends this version to the supplied "host" ETL. * @param processor The version of the host ETL running this library @@ -82,6 +85,11 @@ object MiscEnrichments { } /** Turn a list of custom contexts into a self-describing JSON property */ - def formatDerivedContexts(derivedContexts: List[SelfDescribingData[Json]]): String = - SelfDescribingData(ContextsSchema, Json.arr(derivedContexts.map(_.normalize): _*)).asString + def formatContexts(contexts: List[SelfDescribingData[Json]]): Option[String] = + if (contexts.isEmpty) None + else Some(SelfDescribingData(ContextsSchema, Json.arr(contexts.map(_.normalize): _*)).asString) + + /** Turn a unstruct event into a self-describing JSON property */ + def formatUnstructEvent(unstructEvent: Option[SelfDescribingData[Json]]): Option[String] = + unstructEvent.map(e => SelfDescribingData(UnstructEventSchema, e.normalize).asString) } diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala index 45cbff319..842648230 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala @@ -17,14 +17,16 @@ import cats.data.{EitherT, NonEmptyList, Validated, ValidatedNel} import cats.effect.Clock import cats.implicits._ -import io.circe.Json +import io.circe._ +import io.circe.syntax._ +import io.circe.generic.semiauto._ import java.time.Instant import com.snowplowanalytics.iglu.client.{ClientError, IgluCirceClient} import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup -import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData} +import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.badrows._ @@ -71,7 +73,11 @@ object IgluUtils { .map(_.toValidatedNel) } yield (contexts, unstruct) .mapN { (c, ue) => - (c, ue) + val supersedingInfoContexts = (c.flatMap(_.supersedingInfo) ::: ue.flatMap(_.supersedingInfo).toList).distinct + .map(_.toSdj) + val contexts = c.map(_.sdj) ::: supersedingInfoContexts + val unstruct = ue.map(_.sdj) + (contexts, unstruct) } .leftMap { schemaViolations => buildSchemaViolationsBadRow( @@ -98,27 +104,17 @@ object IgluUtils { client: IgluCirceClient[F], field: String = "ue_properties", criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", 1, 0) - ): F[Validated[FailureDetails.SchemaViolation, Option[SelfDescribingData[Json]]]] = + ): F[Validated[FailureDetails.SchemaViolation, Option[SdjExtractResult]]] = (Option(enriched.unstruct_event) match { case Some(rawUnstructEvent) => for { // Validate input Json string and extract unstructured event unstruct <- extractInputData(rawUnstructEvent, field, criterion, client) // Parse Json unstructured event as SelfDescribingData[Json] - unstructSDJ <- SelfDescribingData - .parse(unstruct) - .leftMap(FailureDetails.SchemaViolation.NotIglu(unstruct, _)) - .toEitherT[F] - // Check SelfDescribingData[Json] of unstructured event - _ <- check(client, unstructSDJ) - .leftMap { - case (schemaKey, clientError) => - FailureDetails.SchemaViolation.IgluError(schemaKey, clientError) - } - .leftWiden[FailureDetails.SchemaViolation] + unstructSDJ <- parseAndValidateSDJ_sv(unstruct, client) } yield unstructSDJ.some case None => - EitherT.rightT[F, FailureDetails.SchemaViolation](none[SelfDescribingData[Json]]) + EitherT.rightT[F, FailureDetails.SchemaViolation](none[SdjExtractResult]) }).toValidated /** @@ -135,7 +131,7 @@ object IgluUtils { client: IgluCirceClient[F], field: String = "contexts", criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "contexts", "jsonschema", 1, 0) - ): F[ValidatedNel[FailureDetails.SchemaViolation, List[SelfDescribingData[Json]]]] = + ): F[ValidatedNel[FailureDetails.SchemaViolation, List[SdjExtractResult]]] = (Option(enriched.contexts) match { case Some(rawContexts) => for { @@ -153,7 +149,7 @@ object IgluUtils { } yield contextsSDJ case None => EitherT.rightT[F, NonEmptyList[FailureDetails.SchemaViolation]]( - List.empty[SelfDescribingData[Json]] + List.empty[SdjExtractResult] ) }).toValidated @@ -238,10 +234,13 @@ object IgluUtils { private def check[F[_]: Monad: RegistryLookup: Clock]( client: IgluCirceClient[F], sdj: SelfDescribingData[Json] - ): EitherT[F, (SchemaKey, ClientError), Unit] = + ): EitherT[F, (SchemaKey, ClientError), Option[SchemaVer.Full]] = client .check(sdj) - .leftMap(clientErr => (sdj.schema, clientErr)) + .leftMap({ + case (e, None) => (sdj.schema, e) + case (e, Some(superseding)) => (sdj.schema.copy(version = superseding), e) + }) /** Check a list of SDJs and merge the Iglu errors */ private def checkList[F[_]: Monad: RegistryLookup: Clock]( @@ -259,20 +258,48 @@ object IgluUtils { private def parseAndValidateSDJ_sv[F[_]: Monad: RegistryLookup: Clock]( // _sv for SchemaViolation json: Json, client: IgluCirceClient[F] - ): EitherT[F, FailureDetails.SchemaViolation, SelfDescribingData[Json]] = + ): EitherT[F, FailureDetails.SchemaViolation, SdjExtractResult] = for { sdj <- SelfDescribingData .parse(json) .leftMap(FailureDetails.SchemaViolation.NotIglu(json, _)) .toEitherT[F] - _ <- check(client, sdj) - .leftMap { - case (schemaKey, clientError) => - FailureDetails.SchemaViolation - .IgluError(schemaKey, clientError): FailureDetails.SchemaViolation + supersedingSchema <- check(client, sdj) + .leftMap { + case (schemaKey, clientError) => + FailureDetails.SchemaViolation + .IgluError(schemaKey, clientError): FailureDetails.SchemaViolation - } - } yield sdj + } + supersedingInfo = supersedingSchema.map(s => SchemaSupersedingInfo(sdj.schema, s)) + sdjUpdated = replaceSchemaVersion(sdj, supersedingInfo) + } yield SdjExtractResult(sdjUpdated, supersedingInfo) + + private def replaceSchemaVersion( + sdj: SelfDescribingData[Json], + supersedingInfo: Option[SchemaSupersedingInfo] + ): SelfDescribingData[Json] = + supersedingInfo match { + case None => sdj + case Some(s) => sdj.copy(schema = sdj.schema.copy(version = s.supersededBy)) + } + + case class SchemaSupersedingInfo(originalSchema: SchemaKey, supersededBy: SchemaVer.Full) { + def toSdj: SelfDescribingData[Json] = + SelfDescribingData(SchemaSupersedingInfo.schemaKey, (this: SchemaSupersedingInfo).asJson) + } + + object SchemaSupersedingInfo { + val schemaKey = SchemaKey("com.snowplowanalytics.iglu", "superseding_info", "jsonschema", SchemaVer.Full(1, 0, 0)) + + implicit val schemaVerFullEncoder: Encoder[SchemaVer.Full] = + Encoder.encodeString.contramap(v => v.asString) + + implicit val schemaSupersedingInfoEncoder: Encoder[SchemaSupersedingInfo] = + deriveEncoder[SchemaSupersedingInfo] + } + + case class SdjExtractResult(sdj: SelfDescribingData[Json], supersedingInfo: Option[SchemaSupersedingInfo]) /** Build `BadRow.SchemaViolations` from a list of `FailureDetails.SchemaViolation`s */ def buildSchemaViolationsBadRow( diff --git a/modules/common/src/test/resources/iglu-schemas/schemas/com.acme/superseding_example/jsonschema/1-0-0 b/modules/common/src/test/resources/iglu-schemas/schemas/com.acme/superseding_example/jsonschema/1-0-0 new file mode 100644 index 000000000..2105d988b --- /dev/null +++ b/modules/common/src/test/resources/iglu-schemas/schemas/com.acme/superseding_example/jsonschema/1-0-0 @@ -0,0 +1,25 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "$supersededBy": "1-0-1", + "description": "Superseding schema example", + "self": { + "vendor": "com.acme", + "name": "superseding_schema", + "format": "jsonschema", + "version": "1-0-0" + }, + "type": "object", + "properties": { + "field_a": { + "type": "string" + }, + "field_b": { + "type": "string" + }, + "field_c": { + "type": "string" + } + }, + "required": ["field_a", "field_b"], + "additionalProperties": false +} diff --git a/modules/common/src/test/resources/iglu-schemas/schemas/com.acme/superseding_example/jsonschema/1-0-1 b/modules/common/src/test/resources/iglu-schemas/schemas/com.acme/superseding_example/jsonschema/1-0-1 new file mode 100644 index 000000000..9d6067607 --- /dev/null +++ b/modules/common/src/test/resources/iglu-schemas/schemas/com.acme/superseding_example/jsonschema/1-0-1 @@ -0,0 +1,27 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "description": "Superseding schema example", + "self": { + "vendor": "com.acme", + "name": "superseding_schema", + "format": "jsonschema", + "version": "1-0-1" + }, + "type": "object", + "properties": { + "field_a": { + "type": "string" + }, + "field_b": { + "type": "string" + }, + "field_c": { + "type": "string" + }, + "field_d": { + "type": "string" + } + }, + "required": ["field_a", "field_b"], + "additionalProperties": false +} diff --git a/modules/common/src/test/resources/iglu-schemas/schemas/com.acme/superseding_example/jsonschema/2-0-0 b/modules/common/src/test/resources/iglu-schemas/schemas/com.acme/superseding_example/jsonschema/2-0-0 new file mode 100644 index 000000000..63b5db4bc --- /dev/null +++ b/modules/common/src/test/resources/iglu-schemas/schemas/com.acme/superseding_example/jsonschema/2-0-0 @@ -0,0 +1,22 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "$supersededBy": "2-0-1", + "description": "Superseding schema example", + "self": { + "vendor": "com.acme", + "name": "superseding_schema", + "format": "jsonschema", + "version": "2-0-0" + }, + "type": "object", + "properties": { + "field_e": { + "type": "string" + }, + "field_f": { + "type": "string" + } + }, + "required": ["field_e", "field_f"], + "additionalProperties": false +} \ No newline at end of file diff --git a/modules/common/src/test/resources/iglu-schemas/schemas/com.acme/superseding_example/jsonschema/2-0-1 b/modules/common/src/test/resources/iglu-schemas/schemas/com.acme/superseding_example/jsonschema/2-0-1 new file mode 100644 index 000000000..81baee2d0 --- /dev/null +++ b/modules/common/src/test/resources/iglu-schemas/schemas/com.acme/superseding_example/jsonschema/2-0-1 @@ -0,0 +1,24 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "description": "Superseding schema example", + "self": { + "vendor": "com.acme", + "name": "superseding_schema", + "format": "jsonschema", + "version": "2-0-1" + }, + "type": "object", + "properties": { + "field_e": { + "type": "string" + }, + "field_f": { + "type": "string" + }, + "field_g": { + "type": "string" + } + }, + "required": ["field_e", "field_f"], + "additionalProperties": false +} \ No newline at end of file diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala index 7be51554d..d5123632d 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala @@ -18,6 +18,7 @@ import cats.Id import cats.implicits._ import cats.data.NonEmptyList import io.circe.literal._ +import io.circe.parser.{parse => jparse} import org.joda.time.DateTime import com.snowplowanalytics.snowplow.badrows._ import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer} @@ -780,6 +781,155 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers { enriched.value.map(_.useragent) must beRight(qs_ua) enriched.value.map(_.derived_contexts) must beRight((_: String).contains("\"agentName\":\"%1$S\"")) } + + "emit an EnrichedEvent with superseded schemas" >> { + val expectedContexts = jparse( + """ + { + "schema": "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1", + "data": [ + { + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "emailAddress": "hello@world.com", + "emailAddress2": "foo@bar.org" + } + }, + { + "schema":"iglu:com.acme/superseding_example/jsonschema/1-0-1", + "data": { + "field_a": "value_a", + "field_b": "value_b" + } + }, + { + "schema":"iglu:com.acme/superseding_example/jsonschema/1-0-1", + "data": { + "field_a": "value_a", + "field_b": "value_b", + "field_d": "value_d" + } + }, + { + "schema":"iglu:com.acme/superseding_example/jsonschema/1-0-1", + "data": { + "field_a": "value_a", + "field_b": "value_b", + "field_c": "value_c", + "field_d": "value_d" + } + }, + { + "schema":"iglu:com.snowplowanalytics.iglu/superseding_info/jsonschema/1-0-0", + "data":{ + "originalSchema":"iglu:com.acme/superseding_example/jsonschema/1-0-0", + "supersededBy":"1-0-1" + } + }, + { + "schema":"iglu:com.snowplowanalytics.iglu/superseding_info/jsonschema/1-0-0", + "data":{ + "originalSchema":"iglu:com.acme/superseding_example/jsonschema/2-0-0", + "supersededBy":"2-0-1" + } + } + ] + } + """ + ).toOption.get + val expectedUnstructEvent = jparse( + """ + { + "schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", + "data":{ + "schema":"iglu:com.acme/superseding_example/jsonschema/2-0-1", + "data": { + "field_e": "value_e", + "field_f": "value_f", + "field_g": "value_g" + } + } + } + """ + ).toOption.get + val parameters = Map( + "e" -> "ue", + "tv" -> "js-0.13.1", + "p" -> "web", + "co" -> + """ + { + "schema": "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0", + "data": [ + { + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "emailAddress": "hello@world.com", + "emailAddress2": "foo@bar.org" + } + }, + { + "schema":"iglu:com.acme/superseding_example/jsonschema/1-0-0", + "data": { + "field_a": "value_a", + "field_b": "value_b" + } + }, + { + "schema":"iglu:com.acme/superseding_example/jsonschema/1-0-0", + "data": { + "field_a": "value_a", + "field_b": "value_b", + "field_d": "value_d" + } + }, + { + "schema":"iglu:com.acme/superseding_example/jsonschema/1-0-1", + "data": { + "field_a": "value_a", + "field_b": "value_b", + "field_c": "value_c", + "field_d": "value_d" + } + } + ] + } + """, + "ue_pr" -> + """ + { + "schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", + "data":{ + "schema":"iglu:com.acme/superseding_example/jsonschema/2-0-0", + "data": { + "field_e": "value_e", + "field_f": "value_f", + "field_g": "value_g" + } + } + }""" + ).toOpt + val rawEvent = RawEvent(api, parameters, None, source, context) + val enriched = EnrichmentManager.enrichEvent[Id]( + enrichmentReg, + client, + processor, + timestamp, + rawEvent, + AcceptInvalid.featureFlags, + AcceptInvalid.countInvalid + ) + + enriched.value must beRight.like { + case e: EnrichedEvent => + val p = EnrichedEvent.toPartiallyEnrichedEvent(e) + val contextsJson = jparse(p.contexts.get).toOption.get + val ueJson = jparse(p.unstruct_event.get).toOption.get + (contextsJson must beEqualTo(expectedContexts)) and + (ueJson must beEqualTo(expectedUnstructEvent)) + case _ => ko + } + } } "getIabContext" should { diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/miscEnrichmentSpecs.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/miscEnrichmentSpecs.scala index e01bf2bc2..61b235960 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/miscEnrichmentSpecs.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/miscEnrichmentSpecs.scala @@ -91,9 +91,9 @@ class ExtractIpSpec extends Specification with DataTables { } -class FormatDerivedContextsSpec extends MutSpecification { +class FormatContextsSpec extends MutSpecification { - "extractDerivedContexts" should { + "extractContexts" should { "convert a list of JObjects to a self-describing contexts JSON" in { val derivedContextsList = List( @@ -128,7 +128,35 @@ class FormatDerivedContextsSpec extends MutSpecification { |] |}""".stripMargin.replaceAll("[\n\r]", "") - MiscEnrichments.formatDerivedContexts(derivedContextsList) must_== expected + MiscEnrichments.formatContexts(derivedContextsList) must beSome(expected) + } + } +} + +class FormatUnstructEventSpec extends MutSpecification { + + "extractUnstructEvent" should { + "convert a JObject to a self-describing unstruct event JSON" in { + + val unstructEvent = SelfDescribingData( + SchemaKey("com.acme", "design", "jsonschema", SchemaVer.Full(1, 0, 0)), + json"""{"color": "red", "fontSize": 14}""" + ) + + val expected = """ + |{ + |"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", + |"data": + |{ + |"schema":"iglu:com.acme/design/jsonschema/1-0-0", + |"data":{ + |"color":"red", + |"fontSize":14 + |} + |} + |}""".stripMargin.replaceAll("[\n\r]", "") + + MiscEnrichments.formatUnstructEvent(Some(unstructEvent)) must beSome(expected) } } } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala index 6d697e0a1..0d013efb5 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala @@ -21,6 +21,7 @@ import com.snowplowanalytics.iglu.client.ClientError.{ResolutionError, Validatio import com.snowplowanalytics.snowplow.badrows._ import io.circe.Json +import io.circe.parser.parse import cats.data.NonEmptyList @@ -65,6 +66,14 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers { "jsonschema", SchemaVer.Full(1, 0, 0) ) + val supersedingExampleSchema100 = + SchemaKey( + "com.acme", + "superseding_example", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val supersedingExampleSchema101 = supersedingExampleSchema100.copy(version = SchemaVer.Full(1, 0, 1)) val clientSessionSchema = SchemaKey( "com.snowplowanalytics.snowplow", @@ -92,6 +101,23 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers { "emailAddress": "hello@world.com" } }""" + val supersedingExample1 = + s"""{ + "schema": "${supersedingExampleSchema100.toSchemaUri}", + "data": { + "field_a": "value_a", + "field_b": "value_b" + } + }""" + val supersedingExample2 = + s"""{ + "schema": "${supersedingExampleSchema100.toSchemaUri}", + "data": { + "field_a": "value_a", + "field_b": "value_b", + "field_d": "value_d" + } + }""" val clientSession = s"""{ "schema": "${clientSessionSchema.toSchemaUri}", "data": { @@ -201,10 +227,41 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers { IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client) must beValid.like { - case Some(sdj) if sdj.schema == emailSentSchema => ok - case Some(sdj) => + case Some(IgluUtils.SdjExtractResult(sdj, None)) if sdj.schema == emailSentSchema => ok + case Some(s) => + ko( + s"unstructured event's schema [${s.sdj.schema}] does not match expected schema [${emailSentSchema}]" + ) + case None => ko("no unstructured event was extracted") + } + } + + "return the extracted unstructured event when schema is superseded by another schema" >> { + val input1 = new EnrichedEvent + input1.setUnstruct_event(buildUnstruct(supersedingExample1)) + + val input2 = new EnrichedEvent + input2.setUnstruct_event(buildUnstruct(supersedingExample2)) + + val expectedSupersedingInfo = IgluUtils.SchemaSupersedingInfo(supersedingExampleSchema100, supersedingExampleSchema101.version) + + IgluUtils + .extractAndValidateUnstructEvent(input1, SpecHelpers.client) must beValid.like { + case Some(IgluUtils.SdjExtractResult(sdj, Some(`expectedSupersedingInfo`))) if sdj.schema == supersedingExampleSchema101 => ok + case Some(s) => + ko( + s"unstructured event's schema [${s.sdj.schema}] does not match expected schema [${supersedingExampleSchema101}]" + ) + case None => ko("no unstructured event was extracted") + } + + // input2 wouldn't be validated with 1-0-0. It would be validated with 1-0-1 only. + IgluUtils + .extractAndValidateUnstructEvent(input2, SpecHelpers.client) must beValid.like { + case Some(IgluUtils.SdjExtractResult(sdj, Some(`expectedSupersedingInfo`))) if sdj.schema == supersedingExampleSchema101 => ok + case Some(s) => ko( - s"unstructured event's schema [${sdj.schema}] does not match expected schema [${emailSentSchema}]" + s"unstructured event's schema [${s.sdj.schema}] does not match expected schema [${supersedingExampleSchema101}]" ) case None => ko("no unstructured event was extracted") } @@ -326,7 +383,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers { IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client) must beValid.like { - case sdjs: List[SelfDescribingData[Json]] if sdjs.size == 2 && sdjs.forall(_.schema == emailSentSchema) => + case sdjs if sdjs.size == 2 && sdjs.forall(i => i.sdj.schema == emailSentSchema && i.supersedingInfo.isEmpty) => ok case res => ko(s"[$res] are not 2 SDJs with expected schema [${emailSentSchema.toSchemaUri}]") @@ -338,12 +395,24 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers { input.setContexts(buildInputContexts(List(clientSession))) IgluUtils.extractAndValidateInputContexts(input, SpecHelpers.client) must beValid.like { - case sdj: List[SelfDescribingData[Json]] if sdj.size == 1 && sdj.forall(_.schema == clientSessionSchema) => + case sdj if sdj.size == 1 && sdj.forall(_.sdj.schema == clientSessionSchema) => ok case _ => ko("$.previousSessionId: is missing but it is required") } } + + "return the extracted context when schema is superseded by another schema" >> { + val input = new EnrichedEvent + input.setContexts(buildInputContexts(List(supersedingExample1, supersedingExample2))) + + IgluUtils.extractAndValidateInputContexts(input, SpecHelpers.client) must beValid.like { + case sdj if sdj.size == 2 && sdj.forall(_.sdj.schema == supersedingExampleSchema101) => + ok + case _ => + ko("Failed to extract context when schema is superseded by another schema") + } + } } "validateEnrichmentsContexts" should { @@ -515,6 +584,42 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers { ) } } + + "return the extracted unstructured event and the extracted input contexts when schema is superseded by another schema" >> { + val input = new EnrichedEvent + input.setUnstruct_event(buildUnstruct(supersedingExample1)) + input.setContexts(buildInputContexts(List(supersedingExample1, supersedingExample2))) + + val expectedSupersedingInfoContext = parse( + """ { + | "originalSchema" : "iglu:com.acme/superseding_example/jsonschema/1-0-0", + | "supersededBy" : "1-0-1" + |}""".stripMargin + ).toOption.get + + IgluUtils + .extractAndValidateInputJsons( + input, + SpecHelpers.client, + raw, + processor + ) + .value must beRight.like { + case (sdjs: List[SelfDescribingData[Json]], Some(sdj: SelfDescribingData[Json])) + if sdjs.size == 3 + && sdj.schema == supersedingExampleSchema101 + && sdjs.count(_.schema == supersedingExampleSchema101) == 2 + && sdjs.count(s => + s.schema == IgluUtils.SchemaSupersedingInfo.schemaKey + && s.data == expectedSupersedingInfoContext + ) == 1 => + ok + case (list, opt) => + ko( + s"[($list, $opt)] is not a list with 2 extracted contexts and an option with the extracted unstructured event" + ) + } + } } def buildUnstruct(sdj: String) = diff --git a/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/good/PagePingWithContextSpec.scala b/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/good/PagePingWithContextSpec.scala index d962b9e46..f104c91d6 100644 --- a/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/good/PagePingWithContextSpec.scala +++ b/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/good/PagePingWithContextSpec.scala @@ -79,7 +79,7 @@ object PagePingWithContextSpec { "", "", "", - """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0","data":{"id":"b05b31c3-81ac-4af5-92d1-113133968655"}}]}""", + """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0","data":{"id":"b05b31c3-81ac-4af5-92d1-113133968655"}}]}""", "", "", "", diff --git a/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/good/PageViewWithContextSpec.scala b/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/good/PageViewWithContextSpec.scala index d91355be8..40878a317 100644 --- a/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/good/PageViewWithContextSpec.scala +++ b/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/good/PageViewWithContextSpec.scala @@ -79,7 +79,7 @@ object PageViewWithContextSpec { "", "", "", - """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0","data":{"id":"b05b31c3-81ac-4af5-92d1-113133968655"}}]}""", + """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0","data":{"id":"b05b31c3-81ac-4af5-92d1-113133968655"}}]}""", "", "", "", diff --git a/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/good/StructEventWithContextSpec.scala b/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/good/StructEventWithContextSpec.scala index 1a9f9c5da..2d1689130 100644 --- a/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/good/StructEventWithContextSpec.scala +++ b/modules/stream/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/good/StructEventWithContextSpec.scala @@ -79,7 +79,7 @@ object StructEventWithContextSpec { "", "", "", - """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0","data":{"id":"b05b31c3-81ac-4af5-92d1-113133968655"}}]}""", + """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0","data":{"id":"b05b31c3-81ac-4af5-92d1-113133968655"}}]}""", "Checkout", "Add", "ASO01043", diff --git a/project/Dependencies.scala b/project/Dependencies.scala index df4070304..6dab75ba7 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -62,7 +62,7 @@ object Dependencies { val gatlingJsonpath = "0.6.14" val scalaUri = "1.5.1" val badRows = "2.1.1" - val igluClient = "1.3.0" + val igluClient = "1.4.0-M1" val snowplowRawEvent = "0.1.0" val collectorPayload = "0.0.0"