diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFields.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFields.scala index 26e513760..2ddbc83b4 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFields.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFields.scala @@ -12,6 +12,8 @@ package com.snowplowanalytics.snowplow.enrich.common.enrichments import cats.data.NonEmptyList +import io.circe.syntax._ + import com.snowplowanalytics.snowplow.badrows.FailureDetails import com.snowplowanalytics.iglu.client.ClientError.ValidationError @@ -133,12 +135,19 @@ object AtomicFields { AtomicFields(withLimits) } - def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): FailureDetails.SchemaViolation = { + def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): Failure.SchemaViolation = { val clientError = ValidationError(ValidatorError.InvalidData(errors), None) - FailureDetails.SchemaViolation.IgluError( - AtomicFields.atomicSchema, - clientError + val failureData = errors.toList.flatMap(e => e.path.map(p => p := e.keyword)).toMap.asJson + + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.IgluError( + AtomicFields.atomicSchema, + clientError + ), + // Source atomic field and actual value of the field should be already on the ValidatorReport list + source = "atomic_field", + data = failureData ) } } diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala index c14a7a6d2..01929f75a 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala @@ -19,8 +19,6 @@ import cats.implicits._ import com.snowplowanalytics.iglu.client.validator.ValidatorReport -import com.snowplowanalytics.snowplow.badrows.FailureDetails - import com.snowplowanalytics.snowplow.enrich.common.enrichments.AtomicFields.LimitedAtomicField import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent @@ -37,7 +35,7 @@ object AtomicFieldsLengthValidator { acceptInvalid: Boolean, invalidCount: F[Unit], atomicFields: AtomicFields - ): IorT[F, FailureDetails.SchemaViolation, Unit] = + ): IorT[F, Failure.SchemaViolation, Unit] = IorT { atomicFields.value .map(validateField(event, _).toValidatedNel) diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala index 54be85e71..6737dccfe 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala @@ -30,7 +30,7 @@ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.badrows._ -import com.snowplowanalytics.snowplow.badrows.{FailureDetails, Payload, Processor} +import com.snowplowanalytics.snowplow.badrows.{Failure => BadRowFailure} import com.snowplowanalytics.snowplow.enrich.common.{EtlPipeline, QueryStringParameters, RawEventParameters} import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent @@ -71,52 +71,109 @@ object EnrichmentManager { atomicFields: AtomicFields, emitIncomplete: Boolean ): IorT[F, BadRow, EnrichedEvent] = { - val iorT: IorT[F, NonEmptyList[BadRow], EnrichedEvent] = for { - enriched <- IorT.pure[F, NonEmptyList[BadRow]](new EnrichedEvent) - extractResult <- mapAndValidateInput( - raw, - enriched, - etlTstamp, - processor, - client, - registryLookup - ) - .leftMap(NonEmptyList.one) - .possiblyExitingEarly(emitIncomplete) - // Next 2 lines remove the invalid contexts and the invalid unstructured event from the event. - // This should be done after the bad row was created and only if emitIncomplete is enabled. - _ = { - enriched.contexts = ME.formatContexts(extractResult.contexts).orNull - enriched.unstruct_event = ME.formatUnstructEvent(extractResult.unstructEvent).orNull - } - enrichmentsContexts <- runEnrichments( - registry, - processor, - raw, - enriched, - extractResult.contexts, - extractResult.unstructEvent, - featureFlags.legacyEnrichmentOrder - ) - .leftMap(NonEmptyList.one) - .possiblyExitingEarly(emitIncomplete) - _ <- validateEnriched( - enriched, - raw, - enrichmentsContexts, - extractResult.validationInfoContexts, - client, - processor, - registryLookup, - featureFlags.acceptInvalid, - invalidCount, - atomicFields - ) - .leftMap(NonEmptyList.one) - .possiblyExitingEarly(emitIncomplete) - } yield enriched + def enrich(enriched: EnrichedEvent): IorT[F, NonEmptyList[NonEmptyList[Failure]], List[SelfDescribingData[Json]]] = + for { + extractResult <- mapAndValidateInput( + raw, + enriched, + etlTstamp, + processor, + client, + registryLookup + ) + .leftMap(NonEmptyList.one) + .possiblyExitingEarly(emitIncomplete) + // Next 2 lines remove the invalid contexts and the invalid unstructured event from the event. + // This should be done after the bad row was created and only if emitIncomplete is enabled. + _ = { + enriched.contexts = ME.formatContexts(extractResult.contexts).orNull + enriched.unstruct_event = ME.formatUnstructEvent(extractResult.unstructEvent).orNull + } + enrichmentsContexts <- runEnrichments( + registry, + raw, + enriched, + extractResult.contexts, + extractResult.unstructEvent, + featureFlags.legacyEnrichmentOrder + ) + .leftMap(NonEmptyList.one) + .possiblyExitingEarly(emitIncomplete) + validContexts <- validateEnriched( + enriched, + enrichmentsContexts, + client, + registryLookup, + featureFlags.acceptInvalid, + invalidCount, + atomicFields + ) + .leftMap(NonEmptyList.one) + .possiblyExitingEarly(emitIncomplete) + derivedContexts = validContexts ::: extractResult.validationInfoContexts + } yield derivedContexts + + // derived contexts are set lastly because we want to include failure entities + // to derived contexts as well and we can get failure entities only in the end + // of the enrichment process + IorT( + for { + enrichedEvent <- Sync[F].delay(new EnrichedEvent) + enrichmentResult <- enrich(enrichedEvent).value + now = Instant.now() + _ = setDerivedContexts(enrichedEvent, enrichmentResult, now, processor) + result = enrichmentResult + .leftMap { fe => + createBadRow( + fe, + EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent), + RawEvent.toRawEvent(raw), + now, + processor + ) + } + .map(_ => enrichedEvent) + } yield result + ) + } + + private def createBadRow( + fe: NonEmptyList[NonEmptyList[Failure]], + pe: Payload.PartiallyEnrichedEvent, + re: Payload.RawEvent, + timestamp: Instant, + processor: Processor + ): BadRow = { + val firstList = fe.head + firstList.head match { + case h: Failure.SchemaViolation => + val sv = firstList.tail.collect { case f: Failure.SchemaViolation => f } + BadRow.SchemaViolations( + processor, + BadRowFailure.SchemaViolations(timestamp, NonEmptyList(h, sv).map(_.schemaViolation)), + Payload.EnrichmentPayload(pe, re) + ) + case h: Failure.EnrichmentFailure => + val ef = firstList.tail.collect { case f: Failure.EnrichmentFailure => f } + BadRow.EnrichmentFailures( + processor, + BadRowFailure.EnrichmentFailures(timestamp, NonEmptyList(h, ef).map(_.enrichmentFailure)), + Payload.EnrichmentPayload(pe, re) + ) + } + } - iorT.leftMap(_.head) + def setDerivedContexts( + enriched: EnrichedEvent, + enrichmentResult: Ior[NonEmptyList[NonEmptyList[Failure]], List[SelfDescribingData[Json]]], + timestamp: Instant, + processor: Processor + ): Unit = { + val derivedContexts = enrichmentResult.leftMap { ll => + ll.flatten.toList + .map(_.toSDJ(timestamp, processor)) + }.merge + ME.formatContexts(derivedContexts).foreach(c => enriched.derived_contexts = c) } private def mapAndValidateInput[F[_]: Sync]( @@ -126,23 +183,15 @@ object EnrichmentManager { processor: Processor, client: IgluCirceClient[F], registryLookup: RegistryLookup[F] - ): IorT[F, BadRow, IgluUtils.EventExtractResult] = { - val iorT = for { + ): IorT[F, NonEmptyList[Failure], IgluUtils.EventExtractResult] = + for { _ <- setupEnrichedEvent[F](raw, enrichedEvent, etlTstamp, processor) .leftMap(NonEmptyList.one) - extract <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, registryLookup) + extract <- IgluUtils + .extractAndValidateInputJsons(enrichedEvent, client, registryLookup) + .leftMap { l: NonEmptyList[Failure] => l } } yield extract - iorT.leftMap { violations => - buildSchemaViolationsBadRow( - violations, - EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent), - RawEvent.toRawEvent(raw), - processor - ) - } - } - /** * Run all the enrichments * @param enriched /!\ MUTABLE enriched event, mutated IN-PLACE /!\ @@ -151,13 +200,12 @@ object EnrichmentManager { */ private def runEnrichments[F[_]: Monad]( registry: EnrichmentRegistry[F], - processor: Processor, raw: RawEvent, enriched: EnrichedEvent, inputContexts: List[SelfDescribingData[Json]], unstructEvent: Option[SelfDescribingData[Json]], legacyOrder: Boolean - ): IorT[F, BadRow, List[SelfDescribingData[Json]]] = + ): IorT[F, NonEmptyList[Failure], List[SelfDescribingData[Json]]] = IorT { accState(registry, raw, inputContexts, unstructEvent, legacyOrder) .runS(Accumulation(enriched, Nil, Nil)) @@ -166,12 +214,7 @@ object EnrichmentManager { failures.toNel match { case Some(nel) => Ior.both( - buildEnrichmentFailuresBadRow( - nel, - EnrichedEvent.toPartiallyEnrichedEvent(enriched), - RawEvent.toRawEvent(raw), - processor - ), + nel.map(Failure.EnrichmentFailure), contexts ) case None => @@ -182,33 +225,19 @@ object EnrichmentManager { private def validateEnriched[F[_]: Clock: Monad]( enriched: EnrichedEvent, - raw: RawEvent, enrichmentsContexts: List[SelfDescribingData[Json]], - validationInfoContexts: List[SelfDescribingData[Json]], client: IgluCirceClient[F], - processor: Processor, registryLookup: RegistryLookup[F], acceptInvalid: Boolean, invalidCount: F[Unit], atomicFields: AtomicFields - ): IorT[F, BadRow, Unit] = { - val iorT = for { + ): IorT[F, NonEmptyList[Failure], List[SelfDescribingData[Json]]] = + for { validContexts <- IgluUtils.validateEnrichmentsContexts[F](client, enrichmentsContexts, registryLookup) - _ = ME.formatContexts(validContexts ::: validationInfoContexts).foreach(enriched.derived_contexts = _) _ <- AtomicFieldsLengthValidator .validate[F](enriched, acceptInvalid, invalidCount, atomicFields) - .leftMap(NonEmptyList.one) - } yield () - - iorT.leftMap { violations => - buildSchemaViolationsBadRow( - violations, - EnrichedEvent.toPartiallyEnrichedEvent(enriched), - RawEvent.toRawEvent(raw), - processor - ) - } - } + .leftMap { v: Failure => NonEmptyList.one(v) } + } yield validContexts private[enrichments] case class Accumulation( event: EnrichedEvent, @@ -336,7 +365,7 @@ object EnrichmentManager { e: EnrichedEvent, etlTstamp: DateTime, processor: Processor - ): IorT[F, FailureDetails.SchemaViolation, Unit] = + ): IorT[F, Failure.SchemaViolation, Unit] = IorT { Sync[F].delay { e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter @@ -847,30 +876,6 @@ object EnrichmentManager { } } - private def buildSchemaViolationsBadRow( - vs: NonEmptyList[FailureDetails.SchemaViolation], - pee: Payload.PartiallyEnrichedEvent, - re: Payload.RawEvent, - processor: Processor - ): BadRow.SchemaViolations = - BadRow.SchemaViolations( - processor, - Failure.SchemaViolations(Instant.now(), vs), - Payload.EnrichmentPayload(pee, re) - ) - - private def buildEnrichmentFailuresBadRow( - fs: NonEmptyList[FailureDetails.EnrichmentFailure], - pee: Payload.PartiallyEnrichedEvent, - re: Payload.RawEvent, - processor: Processor - ) = - BadRow.EnrichmentFailures( - processor, - Failure.EnrichmentFailures(Instant.now(), fs), - Payload.EnrichmentPayload(pee, re) - ) - private implicit class IorTOps[F[_], A, B](val iorT: IorT[F, A, B]) extends AnyVal { /** If the incomplete events feature is disabled, then convert a Both to a Left, so we don't waste time with next steps */ diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/Failure.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/Failure.scala new file mode 100644 index 000000000..6e982866a --- /dev/null +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/Failure.scala @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2024-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.enrich.common.enrichments + +import java.time.Instant + +import cats.syntax.option._ + +import io.circe.{Encoder, Json} +import io.circe.generic.semiauto._ +import io.circe.syntax._ + +import com.snowplowanalytics.snowplow.badrows._ + +import com.snowplowanalytics.iglu.client.ClientError +import com.snowplowanalytics.iglu.client.validator.{ValidatorError, ValidatorReport} + +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.iglu.core.circe.implicits.schemaKeyCirceJsonEncoder + +/** + * Represents a failure encountered during enrichment of the event. + * Failure entities will be attached to incomplete events as derived contexts. + */ +sealed trait Failure { + def toSDJ(timestamp: Instant, processor: Processor): SelfDescribingData[Json] +} + +object Failure { + + val failureSchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "failure", "jsonschema", SchemaVer.Full(1, 0, 0)) + + case class SchemaViolation( + schemaViolation: FailureDetails.SchemaViolation, + source: String, + data: Json + ) extends Failure { + def toSDJ(timestamp: Instant, processor: Processor): SelfDescribingData[Json] = { + val feJson = fromSchemaViolation(this, timestamp, processor) + SelfDescribingData(failureSchemaKey, feJson.asJson) + } + + } + + case class EnrichmentFailure( + enrichmentFailure: FailureDetails.EnrichmentFailure + ) extends Failure { + def toSDJ(timestamp: Instant, processor: Processor): SelfDescribingData[Json] = { + val feJson = fromEnrichmentFailure(this, timestamp, processor) + SelfDescribingData(failureSchemaKey, feJson.asJson) + } + } + + case class FailureContext( + failureType: String, + errors: List[Json], + schema: Option[SchemaKey], + data: Option[Json], + timestamp: Instant, + componentName: String, + componentVersion: String + ) + + object FailureContext { + implicit val failureContextEncoder: Encoder[FailureContext] = deriveEncoder[FailureContext] + } + + def fromEnrichmentFailure( + ef: EnrichmentFailure, + timestamp: Instant, + processor: Processor + ): FailureContext = { + val failureType = s"EnrichmentError: ${ef.enrichmentFailure.enrichment.map(_.identifier).getOrElse("")}" + val schemaKey = ef.enrichmentFailure.enrichment.map(_.schemaKey) + val (errors, data) = ef.enrichmentFailure.message match { + case FailureDetails.EnrichmentFailureMessage.InputData(field, value, expectation) => + ( + List( + Json.obj( + "message" := s"$field - $expectation", + "source" := field + ) + ), + Json.obj(field := value).some + ) + case FailureDetails.EnrichmentFailureMessage.Simple(error) => + ( + List( + Json.obj( + "message" := error + ) + ), + None + ) + case FailureDetails.EnrichmentFailureMessage.IgluError(_, error) => + // EnrichmentFailureMessage.IgluError isn't used anywhere in the project. + // We are return this value for completeness. + ( + List( + Json.obj( + "message" := error + ) + ), + None + ) + } + FailureContext( + failureType = failureType, + errors = errors, + schema = schemaKey, + data = data, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + } + + def fromSchemaViolation( + v: SchemaViolation, + timestamp: Instant, + processor: Processor + ): FailureContext = { + val (failureType, errors, schema, data) = v.schemaViolation match { + case FailureDetails.SchemaViolation.NotJson(_, _, err) => + val error = Json.obj("message" := err, "source" := v.source) + ("NotJSON", List(error), None, Json.obj(v.source := v.data).some) + case FailureDetails.SchemaViolation.NotIglu(_, err) => + val message = err.message("").split(":").headOption + val error = Json.obj("message" := message, "source" := v.source) + ("NotIglu", List(error), None, v.data.some) + case FailureDetails.SchemaViolation.CriterionMismatch(schemaKey, schemaCriterion) => + val message = s"Unexpected schema: ${schemaKey.toSchemaUri} does not match the criterion" + val error = Json.obj( + "message" := message, + "source" := v.source, + "criterion" := schemaCriterion.asString + ) + ("CriterionMismatch", List(error), schemaKey.some, v.data.some) + case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ResolutionError(lh)) => + val message = s"Resolution error: schema ${schemaKey.toSchemaUri} not found" + val lookupHistory = lh.toList + .map { + case (repo, lookups) => + lookups.asJson.deepMerge(Json.obj("repository" := repo.asJson)) + } + val error = Json.obj( + "message" := message, + "source" := v.source, + "lookupHistory" := lookupHistory + ) + ("ResolutionError", List(error), schemaKey.some, v.data.some) + case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidData(e), _)) => + val isAtomicField = schemaKey == AtomicFields.atomicSchema + // If error is for atomic field, we want to set the source to atomic field name. Since ValidatorReport.path + // is set to atomic field name, we are using path as source. + def source(r: ValidatorReport) = if (isAtomicField) r.path.getOrElse(v.source) else v.source + val errors = e.toList.map { r => + Json.obj( + "message" := r.message, + "source" := source(r), + "path" := r.path, + "keyword" := r.keyword, + "targets" := r.targets + ) + } + ("ValidationError", errors, schemaKey.some, v.data.some) + case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidSchema(e), _)) => + val errors = e.toList.map { r => + Json.obj( + "message" := s"Invalid schema: ${schemaKey.toSchemaUri} - ${r.message}", + "source" := v.source, + "path" := r.path + ) + } + ("ValidationError", errors, schemaKey.some, v.data.some) + } + FailureContext( + failureType = failureType, + errors = errors, + schema = schema, + data = data, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + } +} diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala index 8483b2cca..6cea198d6 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala @@ -25,6 +25,8 @@ import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.iglu.core.circe.implicits._ +import com.snowplowanalytics.snowplow.enrich.common.enrichments.Failure + import com.snowplowanalytics.snowplow.badrows._ import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent @@ -51,7 +53,7 @@ object IgluUtils { enriched: EnrichedEvent, client: IgluCirceClient[F], registryLookup: RegistryLookup[F] - ): IorT[F, NonEmptyList[FailureDetails.SchemaViolation], EventExtractResult] = + ): IorT[F, NonEmptyList[Failure.SchemaViolation], EventExtractResult] = for { contexts <- extractAndValidateInputContexts(enriched, client, registryLookup) unstruct <- extractAndValidateUnstructEvent(enriched, client, registryLookup) @@ -77,9 +79,9 @@ object IgluUtils { enriched: EnrichedEvent, client: IgluCirceClient[F], registryLookup: RegistryLookup[F], - field: String = "ue_properties", + field: String = "unstruct", criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", 1, 0) - ): IorT[F, NonEmptyList[FailureDetails.SchemaViolation], Option[SdjExtractResult]] = + ): IorT[F, NonEmptyList[Failure.SchemaViolation], Option[SdjExtractResult]] = Option(enriched.unstruct_event) match { case Some(rawUnstructEvent) => val iorT = for { @@ -88,11 +90,11 @@ object IgluUtils { .leftMap(NonEmptyList.one) .toIor // Parse Json unstructured event as SelfDescribingData[Json] - unstructSDJ <- parseAndValidateSDJ(unstruct, client, registryLookup) + unstructSDJ <- parseAndValidateSDJ(unstruct, client, registryLookup, field) } yield unstructSDJ.some iorT.recoverWith { case errors => IorT.fromIor[F](Ior.Both(errors, None)) } case None => - IorT.rightT[F, NonEmptyList[FailureDetails.SchemaViolation]](none[SdjExtractResult]) + IorT.rightT[F, NonEmptyList[Failure.SchemaViolation]](none[SdjExtractResult]) } /** @@ -110,7 +112,7 @@ object IgluUtils { registryLookup: RegistryLookup[F], field: String = "contexts", criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "contexts", "jsonschema", 1, 0) - ): IorT[F, NonEmptyList[FailureDetails.SchemaViolation], List[SdjExtractResult]] = + ): IorT[F, NonEmptyList[Failure.SchemaViolation], List[SdjExtractResult]] = Option(enriched.contexts) match { case Some(rawContexts) => val iorT = for { @@ -122,7 +124,7 @@ object IgluUtils { // Parse and validate each SDJ and merge the errors contextsSdj <- contexts .traverse( - parseAndValidateSDJ(_, client, registryLookup) + parseAndValidateSDJ(_, client, registryLookup, field) .map(sdj => List(sdj)) .recoverWith { case errors => IorT.fromIor[F](Ior.Both(errors, Nil)) } ) @@ -130,7 +132,7 @@ object IgluUtils { } yield contextsSdj iorT.recoverWith { case errors => IorT.fromIor[F](Ior.Both(errors, Nil)) } case None => - IorT.rightT[F, NonEmptyList[FailureDetails.SchemaViolation]](Nil) + IorT.rightT[F, NonEmptyList[Failure.SchemaViolation]](Nil) } /** @@ -146,13 +148,16 @@ object IgluUtils { client: IgluCirceClient[F], sdjs: List[SelfDescribingData[Json]], registryLookup: RegistryLookup[F] - ): IorT[F, NonEmptyList[FailureDetails.SchemaViolation], List[SelfDescribingData[Json]]] = + ): IorT[F, NonEmptyList[Failure.SchemaViolation], List[SelfDescribingData[Json]]] = checkList(client, sdjs, registryLookup) .leftMap( _.map { - case (schemaKey, clientError) => - val f: FailureDetails.SchemaViolation = FailureDetails.SchemaViolation.IgluError(schemaKey, clientError) - f + case (sdj, clientError) => + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.IgluError(sdj.schema, clientError), + source = "derived_contexts", + data = sdj.asJson + ) } ) @@ -163,34 +168,54 @@ object IgluUtils { expectedCriterion: SchemaCriterion, client: IgluCirceClient[F], registryLookup: RegistryLookup[F] - ): EitherT[F, FailureDetails.SchemaViolation, Json] = + ): EitherT[F, Failure.SchemaViolation, Json] = for { // Parse Json string with the SDJ json <- JsonUtils .extractJson(rawJson) - .leftMap(e => FailureDetails.SchemaViolation.NotJson(field, rawJson.some, e)) + .leftMap(e => + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.NotJson(field, rawJson.some, e), + source = field, + data = rawJson.asJson + ) + ) .toEitherT[F] // Parse Json as SelfDescribingData[Json] (which contains the .data that we want) sdj <- SelfDescribingData .parse(json) - .leftMap(FailureDetails.SchemaViolation.NotIglu(json, _)) + .leftMap(e => + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.NotIglu(json, e), + source = field, + data = json + ) + ) .toEitherT[F] // Check that the schema of SelfDescribingData[Json] is the expected one _ <- if (validateCriterion(sdj, expectedCriterion)) - EitherT.rightT[F, FailureDetails.SchemaViolation](sdj) + EitherT.rightT[F, Failure.SchemaViolation](sdj) else EitherT .leftT[F, SelfDescribingData[Json]]( - FailureDetails.SchemaViolation.CriterionMismatch(sdj.schema, expectedCriterion) + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.CriterionMismatch(sdj.schema, expectedCriterion), + source = field, + data = sdj.asJson + ) ) // Check that the SDJ holding the .data is valid _ <- check(client, sdj, registryLookup) .leftMap { case (schemaKey, clientError) => - FailureDetails.SchemaViolation.IgluError(schemaKey, clientError) + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.IgluError(schemaKey, clientError), + source = field, + data = sdj.asJson + ) } // Extract .data of SelfDescribingData[Json] - data <- EitherT.rightT[F, FailureDetails.SchemaViolation](sdj.data) + data <- EitherT.rightT[F, Failure.SchemaViolation](sdj.data) } yield data /** Check that the schema of a SDJ matches the expected one */ @@ -217,11 +242,11 @@ object IgluUtils { client: IgluCirceClient[F], sdjs: List[SelfDescribingData[Json]], registryLookup: RegistryLookup[F] - ): IorT[F, NonEmptyList[(SchemaKey, ClientError)], List[SelfDescribingData[Json]]] = + ): IorT[F, NonEmptyList[(SelfDescribingData[Json], ClientError)], List[SelfDescribingData[Json]]] = sdjs.map { sdj => check(client, sdj, registryLookup) .map(_ => List(sdj)) - .leftMap(NonEmptyList.one) + .leftMap(e => NonEmptyList.one((sdj, e._2))) .toIor .recoverWith { case errors => IorT.fromIor[F](Ior.Both(errors, Nil)) } }.foldA @@ -230,19 +255,29 @@ object IgluUtils { private def parseAndValidateSDJ[F[_]: Monad: Clock]( json: Json, client: IgluCirceClient[F], - registryLookup: RegistryLookup[F] - ): IorT[F, NonEmptyList[FailureDetails.SchemaViolation], SdjExtractResult] = + registryLookup: RegistryLookup[F], + field: String + ): IorT[F, NonEmptyList[Failure.SchemaViolation], SdjExtractResult] = for { sdj <- IorT .fromEither[F](SelfDescribingData.parse(json)) - .leftMap[FailureDetails.SchemaViolation](FailureDetails.SchemaViolation.NotIglu(json, _)) + .leftMap[Failure.SchemaViolation](e => + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.NotIglu(json, e), + source = field, + data = json.asJson + ) + ) .leftMap(NonEmptyList.one) supersedingSchema <- check(client, sdj, registryLookup) .leftMap { case (schemaKey, clientError) => - FailureDetails.SchemaViolation - .IgluError(schemaKey, clientError): FailureDetails.SchemaViolation - + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation + .IgluError(schemaKey, clientError): FailureDetails.SchemaViolation, + source = field, + data = json.asJson + ) } .leftMap(NonEmptyList.one) .toIor diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala index 6bf35e859..c8a0b2fe4 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala @@ -65,6 +65,16 @@ object SpecHelpers extends CatsEffect { } } }, + { + "name": "Temp Iglu Central", + "priority": 0, + "vendorPrefixes": [], + "connection": { + "http": { + "uri": "https://raw.githubusercontent.com/snowplow/iglu-central/incomplete-events-schema/" + } + } + }, { "name": "Embedded src/test/resources", "priority": 100, @@ -129,15 +139,17 @@ object SpecHelpers extends CatsEffect { .flatMap(SelfDescribingData.parse[Json]) .leftMap(err => s"Can't parse Json [$rawJson] as as SelfDescribingData, error: [$err]") - def listContextsSchemas(rawContexts: String): List[SchemaKey] = + def listContexts(rawContexts: String): List[SelfDescribingData[Json]] = jsonStringToSDJ(rawContexts) .map(_.data.asArray.get.toList) - .flatMap(contexts => contexts.traverse(c => SelfDescribingData.parse[Json](c).map(_.schema))) match { + .flatMap(contexts => contexts.traverse(c => SelfDescribingData.parse[Json](c))) match { case Left(err) => throw new IllegalArgumentException(s"Couldn't list contexts schemas. Error: [$err]") - case Right(schemas) => schemas + case Right(sdjs) => sdjs } + def listContextsSchemas(rawContexts: String): List[SchemaKey] = listContexts(rawContexts).map(_.schema) + def getUnstructSchema(rawUnstruct: String): SchemaKey = jsonStringToSDJ(rawUnstruct) .map(_.data) diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/AtomicFieldsSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/AtomicFieldsSpec.scala new file mode 100644 index 000000000..22b5e6baf --- /dev/null +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/AtomicFieldsSpec.scala @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2022-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.enrich.common.enrichments + +import cats.data.NonEmptyList +import cats.syntax.option._ + +import io.circe.Json +import io.circe.syntax._ + +import com.snowplowanalytics.iglu.client.ClientError.ValidationError +import com.snowplowanalytics.iglu.client.validator.{ValidatorError, ValidatorReport} +import com.snowplowanalytics.snowplow.badrows.FailureDetails + +import org.specs2.mutable.Specification + +class AtomicFieldsSpec extends Specification { + + "errorsToSchemaViolation" should { + "convert ValidatorReports to SchemaViolation correctly" >> { + val vrList = NonEmptyList( + ValidatorReport(message = "testMessage", path = "testPath1".some, targets = List("t1, t2"), keyword = "testKeyword1".some), + List( + ValidatorReport(message = "testMessage", path = None, targets = List.empty, keyword = "testKeyword2".some), + ValidatorReport(message = "testMessage", path = "testPath3".some, targets = List("t1", "t2"), keyword = None), + ValidatorReport(message = "testMessage", path = "testPath4".some, targets = List.empty, keyword = "testKeyword4".some) + ) + ) + val expected = Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.IgluError( + schemaKey = AtomicFields.atomicSchema, + error = ValidationError(ValidatorError.InvalidData(vrList), None) + ), + source = "atomic_field", + data = Json.obj( + "testPath1" := "testKeyword1", + "testPath3" := Json.Null, + "testPath4" := "testKeyword4" + ) + ) + val result = AtomicFields.errorsToSchemaViolation(vrList) + result must beEqualTo(expected) + } + } +} diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala index fd1aff96c..fe8317c5d 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala @@ -10,6 +10,8 @@ */ package com.snowplowanalytics.snowplow.enrich.common.enrichments +import java.time.Instant + import org.apache.commons.codec.digest.DigestUtils import org.specs2.mutable.Specification @@ -29,8 +31,10 @@ import io.circe.syntax._ import org.joda.time.DateTime import com.snowplowanalytics.snowplow.badrows._ +import com.snowplowanalytics.snowplow.badrows.{Failure => BadRowFailure} import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.enrich.common.QueryStringParameters import com.snowplowanalytics.snowplow.enrich.common.loaders._ @@ -175,10 +179,10 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Left( BadRow.SchemaViolations( _, - Failure.SchemaViolations(_, - NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), - List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) - ) + BadRowFailure.SchemaViolations(_, + NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), + List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) + ) ), _ ) @@ -242,7 +246,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Left( BadRow.EnrichmentFailures( _, - Failure.EnrichmentFailures( + BadRowFailure.EnrichmentFailures( _, NonEmptyList( FailureDetails.EnrichmentFailure( @@ -311,7 +315,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Left( BadRow.SchemaViolations( _, - Failure.SchemaViolations( + BadRowFailure.SchemaViolations( _, NonEmptyList( _: FailureDetails.SchemaViolation.IgluError, @@ -1237,6 +1241,15 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE JavascriptScriptEnrichment(schemaKey, script) ) ) + val invalidUe = + """{ + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "emailAddress": "hello@world.com", + "emailAddress2": "foo@bar.org", + "unallowedAdditionalField": "foo@bar.org" + } + }""" val parameters = Map( "e" -> "pp", @@ -1252,20 +1265,34 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE } """, "ue_pr" -> - """ + s""" { "schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", - "data":{ - "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", - "data": { - "emailAddress": "hello@world.com", - "emailAddress2": "foo@bar.org", - "unallowedAdditionalField": "foo@bar.org" - } - } + "data":$invalidUe }""" ).toOpt val rawEvent = RawEvent(api, parameters, None, source, context) + def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = { + val emailSentSDJ = SelfDescribingData.parse[Json](jparse(emailSent).toOption.get).toOption.get + SpecHelpers.listContexts(enriched.derived_contexts) match { + case List(SelfDescribingData(Failure.`failureSchemaKey`, feJson), `emailSentSDJ`) + if feJson.field("failureType") == "ValidationError".asJson && + feJson.field("errors") == Json.arr( + Json.obj( + "message" := "$.unallowedAdditionalField: is not defined in the schema and the schema does not allow additional properties", + "source" := "unstruct", + "path" := "$", + "keyword" := "additionalProperties", + "targets" := List("unallowedAdditionalField") + ) + ) && + feJson.field("schema") == emailSentSchema.asJson && + feJson.field("data") == jparse(invalidUe).toOption.get => + true + case _ => false + } + } + val enriched = EnrichmentManager.enrichEvent[IO]( enrichmentReg, client, @@ -1278,11 +1305,12 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE atomicFieldLimits, emitIncomplete = true ) + enriched.value.map { case Ior.Both(_: BadRow.SchemaViolations, enriched) if Option(enriched.unstruct_event).isEmpty && SpecHelpers.listContextsSchemas(enriched.contexts) == List(clientSessionSchema) && - SpecHelpers.listContextsSchemas(enriched.derived_contexts).contains(emailSentSchema) => + expectedDerivedContexts(enriched) => ok case other => ko(s"[$other] is not a SchemaViolations bad row and an enriched event without the unstructured event") } @@ -1305,24 +1333,24 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE JavascriptScriptEnrichment(schemaKey, script) ) ) + val invalidContext = + """{ + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "foo": "hello@world.com", + "emailAddress2": "foo@bar.org" + } + }""" val parameters = Map( "e" -> "pp", "tv" -> "js-0.13.1", "p" -> "web", "co" -> - """ + s""" { "schema": "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0", - "data": [ - { - "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", - "data": { - "foo": "hello@world.com", - "emailAddress2": "foo@bar.org" - } - } - ] + "data": [$invalidContext] } """, "ue_pr" -> @@ -1333,6 +1361,34 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE }""" ).toOpt val rawEvent = RawEvent(api, parameters, None, source, context) + def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = { + val emailSentSDJ = SelfDescribingData.parse[Json](jparse(emailSent).toOption.get).toOption.get + SpecHelpers.listContexts(enriched.derived_contexts) match { + case List(SelfDescribingData(Failure.`failureSchemaKey`, feJson), `emailSentSDJ`) + if feJson.field("failureType") == "ValidationError".asJson && + feJson.field("errors") == Json.arr( + Json.obj( + "message" := "$.emailAddress: is missing but it is required", + "source" := "contexts", + "path" := "$", + "keyword" := "required", + "targets" := List("emailAddress") + ), + Json.obj( + "message" := "$.foo: is not defined in the schema and the schema does not allow additional properties", + "source" := "contexts", + "path" := "$", + "keyword" := "additionalProperties", + "targets" := List("foo") + ) + ) && + feJson.field("schema") == emailSentSchema.asJson && + feJson.field("data") == jparse(invalidContext).toOption.get => + true + case _ => false + } + } + val enriched = EnrichmentManager.enrichEvent[IO]( enrichmentReg, client, @@ -1349,7 +1405,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Both(_: BadRow.SchemaViolations, enriched) if Option(enriched.contexts).isEmpty && SpecHelpers.getUnstructSchema(enriched.unstruct_event) == clientSessionSchema && - SpecHelpers.listContextsSchemas(enriched.derived_contexts).contains(emailSentSchema) => + expectedDerivedContexts(enriched) => ok case other => ko(s"[$other] is not a SchemaViolations bad row and an enriched event with no input contexts") } @@ -1372,6 +1428,15 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE JavascriptScriptEnrichment(schemaKey, script) ) ) + val invalidContext = + """ + { + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "foo": "hello@world.com", + "emailAddress2": "foo@bar.org" + } + }""" val parameters = Map( "e" -> "pp", @@ -1382,13 +1447,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE { "schema": "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0", "data": [ - { - "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", - "data": { - "foo": "hello@world.com", - "emailAddress2": "foo@bar.org" - } - }, + $invalidContext, $clientSession ] } @@ -1401,6 +1460,34 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE }""" ).toOpt val rawEvent = RawEvent(api, parameters, None, source, context) + def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = { + val emailSentSDJ = SelfDescribingData.parse[Json](jparse(emailSent).toOption.get).toOption.get + SpecHelpers.listContexts(enriched.derived_contexts) match { + case List(SelfDescribingData(Failure.`failureSchemaKey`, feJson), `emailSentSDJ`) + if feJson.field("failureType") == "ValidationError".asJson && + feJson.field("errors") == Json.arr( + Json.obj( + "message" := "$.emailAddress: is missing but it is required", + "source" := "contexts", + "path" := "$", + "keyword" := "required", + "targets" := List("emailAddress") + ), + Json.obj( + "message" := "$.foo: is not defined in the schema and the schema does not allow additional properties", + "source" := "contexts", + "path" := "$", + "keyword" := "additionalProperties", + "targets" := List("foo") + ) + ) && + feJson.field("schema") == emailSentSchema.asJson && + feJson.field("data") == jparse(invalidContext).toOption.get => + true + case _ => false + } + } + val enriched = EnrichmentManager.enrichEvent[IO]( enrichmentReg, client, @@ -1417,7 +1504,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Both(_: BadRow.SchemaViolations, enriched) if SpecHelpers.getUnstructSchema(enriched.unstruct_event) == clientSessionSchema && SpecHelpers.listContextsSchemas(enriched.contexts) == List(clientSessionSchema) && - SpecHelpers.listContextsSchemas(enriched.derived_contexts).contains(emailSentSchema) => + expectedDerivedContexts(enriched) => ok case other => ko(s"[$other] is not a SchemaViolations bad row and an enriched event with 1 input context") } @@ -1455,6 +1542,24 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE }""" ).toOpt val rawEvent = RawEvent(api, parameters, None, source, context) + def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = + SpecHelpers.listContexts(enriched.derived_contexts) match { + case List( + SelfDescribingData(Failure.`failureSchemaKey`, feJson), + SelfDescribingData(SchemaKey("nl.basjes", "yauaa_context", "jsonschema", _), _) + ) + if feJson.field("failureType") == "EnrichmentError: Javascript enrichment".asJson && + feJson.field("errors") == Json.arr( + Json.obj( + "message" := "Error during execution of JavaScript function: [Javascript exception in at line number 3 at column number 10]" + ) + ) && + feJson.field("schema") == JavascriptScriptEnrichment.supportedSchema.copy(addition = 0.some).asString.asJson && + feJson.field("data") == Json.Null => + true + case _ => false + } + val enriched = EnrichmentManager.enrichEvent[IO]( enrichmentReg, client, @@ -1470,7 +1575,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE enriched.value.map { case Ior.Both(_: BadRow.EnrichmentFailures, enriched) if SpecHelpers.getUnstructSchema(enriched.unstruct_event) == clientSessionSchema && - !SpecHelpers.listContextsSchemas(enriched.derived_contexts).contains(emailSentSchema) => + expectedDerivedContexts(enriched) => ok case other => ko(s"[$other] is not an EnrichmentFailures bad row and an enriched event") } @@ -1508,6 +1613,36 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE }""" ).toOpt val rawEvent = RawEvent(api, parameters, None, source, context) + def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = + SpecHelpers.listContexts(enriched.derived_contexts) match { + case List( + SelfDescribingData(Failure.`failureSchemaKey`, validationError), + SelfDescribingData(Failure.`failureSchemaKey`, enrichmentError) + ) + if validationError.field("failureType") == "ValidationError".asJson && + validationError.field("errors") == Json.arr( + Json.obj( + "message" := "Cannot be converted to java.math.BigDecimal. Error : Character f is neither a decimal digit number, decimal point, nor \"e\" notation exponential mark.", + "source" := "tr_tt", + "path" := "tr_tt", + "keyword" := "foo", + "targets" := Json.arr() + ) + ) && + validationError.field("schema") == AtomicFields.atomicSchema.asJson && + validationError.field("data") == Json.obj("tr_tt" := "foo") && + enrichmentError.field("failureType") == "EnrichmentError: Javascript enrichment".asJson && + enrichmentError.field("errors") == Json.arr( + Json.obj( + "message" := "Error during execution of JavaScript function: [Javascript exception in at line number 3 at column number 10]" + ) + ) && + enrichmentError.field("schema") == JavascriptScriptEnrichment.supportedSchema.copy(addition = 0.some).asString.asJson && + enrichmentError.field("data") == Json.Null => + true + case _ => false + } + val enriched = EnrichmentManager.enrichEvent[IO]( enrichmentReg, client, @@ -1521,23 +1656,26 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE emitIncomplete = true ) enriched.value.map { - case Ior.Both(_: BadRow.SchemaViolations, _) => ok + case Ior.Both(_: BadRow.SchemaViolations, enriched) if expectedDerivedContexts(enriched) => ok case other => ko(s"[$other] doesn't have a SchemaViolations bad row in the Left") } } "remove an invalid enrichment context and return the enriched event if emitIncomplete is set to true" >> { + val invalidContext = + """ + { + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "foo": "hello@world.com", + "emailAddress2": "foo@bar.org" + } + }""" val script = s""" function process(event) { return [ - { - "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", - "data": { - "foo": "hello@world.com", - "emailAddress2": "foo@bar.org" - } - } + $invalidContext ]; }""" val schemaKey = SchemaKey( @@ -1565,6 +1703,35 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE }""" ).toOpt val rawEvent = RawEvent(api, parameters, None, source, context) + def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = + SpecHelpers.listContexts(enriched.derived_contexts) match { + case List( + SelfDescribingData(Failure.`failureSchemaKey`, feJson), + SelfDescribingData(SchemaKey("nl.basjes", "yauaa_context", "jsonschema", _), _) + ) + if feJson.field("failureType") == "ValidationError".asJson && + feJson.field("errors") == Json.arr( + Json.obj( + "message" := "$.emailAddress: is missing but it is required", + "source" := "derived_contexts", + "path" := "$", + "keyword" := "required", + "targets" := List("emailAddress") + ), + Json.obj( + "message" := "$.foo: is not defined in the schema and the schema does not allow additional properties", + "source" := "derived_contexts", + "path" := "$", + "keyword" := "additionalProperties", + "targets" := List("foo") + ) + ) && + feJson.field("schema") == emailSentSchema.asJson && + feJson.field("data") == jparse(invalidContext).toOption.get => + true + case _ => false + } + val enriched = EnrichmentManager.enrichEvent[IO]( enrichmentReg, client, @@ -1580,11 +1747,97 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE enriched.value.map { case Ior.Both(_: BadRow.SchemaViolations, enriched) if SpecHelpers.getUnstructSchema(enriched.unstruct_event) == clientSessionSchema && - !SpecHelpers.listContextsSchemas(enriched.derived_contexts).contains(emailSentSchema) => + expectedDerivedContexts(enriched) => ok case other => ko(s"[$other] is not a SchemaViolations bad row and an enriched event without the faulty enrichment context") } } + + "return a bad row that contains validation errors only from ue if there is validation error in both ue and derived contexts when emitIncomplete is set to true" >> { + val invalidContext = + """ + { + "schema":"iglu:com.acme/email_sent/jsonschema/1-0-0", + "data": { + "foo": "hello@world.com", + "emailAddress2": "foo@bar.org" + } + }""" + val invalidUe = + """{ + "schema":"iglu:com.snowplowanalytics.snowplow/client_session/jsonschema/1-0-1", + "data": { + "unallowedAdditionalField": "foo@bar.org" + } + }""" + val script = + s""" + function process(event) { + return [ + $invalidContext + ]; + }""" + val schemaKey = SchemaKey( + "com.snowplowanalytics.snowplow", + "javascript_script_config", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val enrichmentReg = EnrichmentRegistry[IO]( + javascriptScript = List( + JavascriptScriptEnrichment(schemaKey, script) + ) + ) + val parameters = Map( + "e" -> "pp", + "tv" -> "js-0.13.1", + "p" -> "web", + "ue_pr" -> + s""" + { + "schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", + "data": $invalidUe + }""" + ).toOpt + val rawEvent = RawEvent(api, parameters, None, source, context) + val enriched = EnrichmentManager.enrichEvent[IO]( + enrichmentReg, + client, + processor, + timestamp, + rawEvent, + AcceptInvalid.featureFlags, + IO.unit, + SpecHelpers.registryLookup, + atomicFieldLimits, + emitIncomplete = true + ) + def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = + SpecHelpers.listContextsSchemas(enriched.derived_contexts).count(_ == Failure.failureSchemaKey) == 2 + + def expectedBadRow(badRow: BadRow): Boolean = + badRow match { + case BadRow.SchemaViolations( + _, + BadRowFailure.SchemaViolations( + _, + NonEmptyList(FailureDetails.SchemaViolation.IgluError(`clientSessionSchema`, _), Nil) + ), + _ + ) => + true + case _ => false + } + + enriched.value.map { + case Ior.Both(badRow, enriched) + if Option(enriched.unstruct_event).isEmpty && + expectedDerivedContexts(enriched) && + expectedBadRow(badRow) => + ok + case other => ko(s"[$other] is not expected one") + } + } } "getCrossDomain" should { @@ -2066,7 +2319,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Left( BadRow.SchemaViolations( _, - Failure.SchemaViolations(_, NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey, clientError), Nil)), + BadRowFailure.SchemaViolations(_, NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey, clientError), Nil)), _ ) ) => @@ -2147,10 +2400,10 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE case Ior.Left( BadRow.SchemaViolations( _, - Failure.SchemaViolations(_, - NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), - List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) - ) + BadRowFailure.SchemaViolations(_, + NonEmptyList(FailureDetails.SchemaViolation.IgluError(schemaKey1, clientError1), + List(FailureDetails.SchemaViolation.IgluError(schemaKey2, clientError2)) + ) ), _ ) @@ -2165,6 +2418,50 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE } } } + + "setDerivedContexts" should { + val sv = Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.NotJson("testField", "testValue".some, "testError"), + source = "testSource", + data = Json.obj("testKey" := "testValue") + ) + val ef = Failure.EnrichmentFailure( + FailureDetails.EnrichmentFailure( + None, + FailureDetails.EnrichmentFailureMessage.Simple("testError") + ) + ) + val emailSentSDJ = SelfDescribingData.parse[Json](jparse(emailSent).toOption.get).toOption.get + val timestamp = Instant.now() + "set derived contexts correctly if enrichment result is Ior.Left" >> { + val enriched = new EnrichedEvent() + val enrichmentResult = Ior.Left(NonEmptyList.of(NonEmptyList.of(sv, ef), NonEmptyList.of(sv, ef))) + EnrichmentManager.setDerivedContexts(enriched, enrichmentResult, timestamp, processor) + val schemas = SpecHelpers.listContextsSchemas(enriched.derived_contexts) + schemas.size must beEqualTo(4) + forall(schemas)(s => s must beEqualTo(Failure.failureSchemaKey)) + } + "set derived contexts correctly if enrichment result is Ior.Right" >> { + val enriched = new EnrichedEvent() + val enrichmentResult = Ior.Right(List(emailSentSDJ, emailSentSDJ)) + EnrichmentManager.setDerivedContexts(enriched, enrichmentResult, timestamp, processor) + val schemas = SpecHelpers.listContextsSchemas(enriched.derived_contexts) + schemas.size must beEqualTo(2) + forall(schemas)(s => s must beEqualTo(emailSentSchema)) + } + "set derived contexts correctly if enrichment result is Ior.Both" >> { + val enriched = new EnrichedEvent() + val enrichmentResult = Ior.Both( + NonEmptyList.of(NonEmptyList.of(sv, ef), NonEmptyList.of(sv, ef)), + List(emailSentSDJ, emailSentSDJ) + ) + EnrichmentManager.setDerivedContexts(enriched, enrichmentResult, timestamp, processor) + val schemas = SpecHelpers.listContextsSchemas(enriched.derived_contexts) + schemas.size must beEqualTo(6) + schemas.count(_ == Failure.failureSchemaKey) must beEqualTo(4) + schemas.count(_ == emailSentSchema) must beEqualTo(2) + } + } } object EnrichmentManagerSpec { @@ -2267,4 +2564,9 @@ object EnrichmentManagerSpec { "userId": "20d631b8-7837-49df-a73e-6da73154e6fd" } }""" + + implicit class JsonFieldGetter(json: Json) { + def field(f: String): Json = + json.hcursor.downField(f).as[Json].toOption.get + } } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/FailureSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/FailureSpec.scala new file mode 100644 index 000000000..678d65d51 --- /dev/null +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/FailureSpec.scala @@ -0,0 +1,392 @@ +/* + * Copyright (c) 2014-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.enrich.common.enrichments + +import java.time.Instant + +import scala.collection.immutable.SortedMap + +import cats.effect.testing.specs2.CatsEffect +import cats.effect.unsafe.implicits.global +import cats.effect.IO + +import cats.data.NonEmptyList +import cats.syntax.option._ + +import io.circe.syntax._ +import io.circe.Json + +import org.specs2.mutable.Specification +import org.specs2.matcher.ValidatedMatchers +import org.specs2.ScalaCheck + +import org.scalacheck.{Gen, Prop} + +import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers +import com.snowplowanalytics.snowplow.badrows.{FailureDetails, Processor} +import com.snowplowanalytics.iglu.core.{ParseError, SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.iglu.client.ClientError +import com.snowplowanalytics.iglu.client.validator.{ValidatorError, ValidatorReport} +import com.snowplowanalytics.iglu.client.resolver.LookupHistory +import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup + +class FailureSpec extends Specification with ValidatedMatchers with CatsEffect with ScalaCheck { + + val timestamp = Instant.now() + val processor = Processor("unit tests SCE", "v42") + val schemaKey = SchemaKey("com.snowplowanalytics", "test", "jsonschema", SchemaVer.Full(1, 0, 0)) + val schemaCriterion = SchemaCriterion.apply("com.snowplowanalytics", "test", "jsonschema", 1) + + "FailureEntityContext should be valid against its schema" >> { + implicit val registryLookup: RegistryLookup[IO] = SpecHelpers.registryLookup + + val genFeContext = for { + failureType <- Gen.alphaNumStr + jsonGen = Gen.oneOf( + Json.obj(), + Json.obj("test1" := "value1"), + Json.obj("test1" := "value1", "test2" := "value2"), + Json.obj("test1" := "value1", "test2" := "value2", "test3" := "value3") + ) + errors <- Gen.listOf(jsonGen) + data <- Gen.option(jsonGen) + schema <- Gen.option(Gen.const(schemaKey)) + } yield Failure.FailureContext( + failureType = failureType, + errors = errors, + schema = schema, + data = data, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + + Prop.forAll(genFeContext) { feContext: Failure.FailureContext => + val sdj = SelfDescribingData(schema = Failure.failureSchemaKey, data = feContext.asJson) + SpecHelpers.client + .check(sdj) + .value + .map(_ must beRight) + .unsafeRunSync() + } + } + + "fromEnrichmentFailure" should { + "convert InputData correctly" >> { + val ef = Failure.EnrichmentFailure( + enrichmentFailure = FailureDetails.EnrichmentFailure( + enrichment = FailureDetails + .EnrichmentInformation( + schemaKey = schemaKey, + identifier = "enrichmentId" + ) + .some, + message = FailureDetails.EnrichmentFailureMessage.InputData( + field = "testField", + value = "testValue".some, + expectation = "testExpectation" + ) + ) + ) + val result = Failure.fromEnrichmentFailure(ef, timestamp, processor) + val expected = Failure.FailureContext( + failureType = "EnrichmentError: enrichmentId", + errors = List( + Json.obj( + "message" := "testField - testExpectation", + "source" := "testField" + ) + ), + schema = schemaKey.some, + data = Json.obj("testField" := "testValue").some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + result must beEqualTo(expected) + } + + "convert Simple correctly" >> { + val ef = Failure.EnrichmentFailure( + enrichmentFailure = FailureDetails.EnrichmentFailure( + enrichment = FailureDetails + .EnrichmentInformation( + schemaKey = schemaKey, + identifier = "enrichmentId" + ) + .some, + message = FailureDetails.EnrichmentFailureMessage.Simple(error = "testError") + ) + ) + val result = Failure.fromEnrichmentFailure(ef, timestamp, processor) + val expected = Failure.FailureContext( + failureType = "EnrichmentError: enrichmentId", + errors = List(Json.obj("message" := "testError")), + schema = schemaKey.some, + data = None, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + result must beEqualTo(expected) + } + } + + "fromSchemaViolation" should { + "convert NotJson correctly" >> { + val sv = Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.NotJson( + field = "testField", + value = "testValue".some, + error = "testError" + ), + source = "testSource", + data = "testData".asJson + ) + val fe = Failure.fromSchemaViolation(sv, timestamp, processor) + val expected = Failure.FailureContext( + failureType = "NotJSON", + errors = List( + Json.obj( + "message" := "testError", + "source" := "testSource" + ) + ), + schema = None, + data = Json.obj("testSource" := "testData").some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + fe must beEqualTo(expected) + } + + "convert NotIglu correctly" >> { + val sv = Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.NotIglu( + json = Json.Null, + error = ParseError.InvalidSchema + ), + source = "testSource", + data = "testData".asJson + ) + val fe = Failure.fromSchemaViolation(sv, timestamp, processor) + val expected = Failure.FailureContext( + failureType = "NotIglu", + errors = List( + Json.obj( + "message" := "Invalid schema", + "source" := "testSource" + ) + ), + schema = None, + data = "testData".asJson.some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + fe must beEqualTo(expected) + } + + "convert CriterionMismatch correctly" >> { + val sv = Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.CriterionMismatch( + schemaKey = schemaKey, + schemaCriterion = schemaCriterion + ), + source = "testSource", + data = "testData".asJson + ) + val fe = Failure.fromSchemaViolation(sv, timestamp, processor) + val expected = Failure.FailureContext( + failureType = "CriterionMismatch", + errors = List( + Json.obj( + "message" := "Unexpected schema: iglu:com.snowplowanalytics/test/jsonschema/1-0-0 does not match the criterion", + "source" := "testSource", + "criterion" := "iglu:com.snowplowanalytics/test/jsonschema/1-*-*" + ) + ), + schema = schemaKey.some, + data = "testData".asJson.some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + fe must beEqualTo(expected) + } + + "convert ResolutionError correctly" >> { + val sv = Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.IgluError( + schemaKey = schemaKey, + error = ClientError.ResolutionError( + value = SortedMap( + "repo1" -> LookupHistory( + errors = Set.empty, + attempts = 1, + lastAttempt = timestamp + ), + "repo2" -> LookupHistory( + errors = Set.empty, + attempts = 2, + lastAttempt = timestamp + ) + ) + ) + ), + source = "testSource", + data = "testData".asJson + ) + val fe = Failure.fromSchemaViolation(sv, timestamp, processor) + val expected = Failure.FailureContext( + failureType = "ResolutionError", + errors = List( + Json.obj( + "message" := "Resolution error: schema iglu:com.snowplowanalytics/test/jsonschema/1-0-0 not found", + "source" := "testSource", + "lookupHistory" := Json.arr( + Json.obj("repository" := "repo1", "errors" := List.empty[String], "attempts" := 1, "lastAttempt" := timestamp), + Json.obj("repository" := "repo2", "errors" := List.empty[String], "attempts" := 2, "lastAttempt" := timestamp) + ) + ) + ), + schema = schemaKey.some, + data = "testData".asJson.some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + fe must beEqualTo(expected) + } + + "convert InvalidData correctly" >> { + def createSv(schemaKey: SchemaKey) = + Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.IgluError( + schemaKey = schemaKey, + error = ClientError.ValidationError( + error = ValidatorError.InvalidData( + messages = NonEmptyList.of( + ValidatorReport(message = "testMessage1", + path = "testPath1".some, + targets = List("testTarget1"), + keyword = "testKeyword1".some + ), + ValidatorReport(message = "testMessage2", + path = "testPath2".some, + targets = List("testTarget2"), + keyword = "testKeyword2".some + ) + ) + ), + supersededBy = None + ) + ), + source = "testSource", + data = "testData".asJson + ) + + val svWithAtomicSchema = createSv(AtomicFields.atomicSchema) + val svWithOrdinarySchema = createSv(schemaKey) + val feWithAtomicSchema = Failure.fromSchemaViolation(svWithAtomicSchema, timestamp, processor) + val feWithOrdinarySchema = Failure.fromSchemaViolation(svWithOrdinarySchema, timestamp, processor) + val expectedWithAtomicSchema = Failure.FailureContext( + failureType = "ValidationError", + errors = List( + Json.obj("message" := "testMessage1", + "source" := "testPath1", + "path" := "testPath1", + "keyword" := "testKeyword1", + "targets" := List("testTarget1") + ), + Json.obj("message" := "testMessage2", + "source" := "testPath2", + "path" := "testPath2", + "keyword" := "testKeyword2", + "targets" := List("testTarget2") + ) + ), + schema = AtomicFields.atomicSchema.some, + data = "testData".asJson.some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + val expectedWithOrdinarySchema = Failure.FailureContext( + failureType = "ValidationError", + errors = List( + Json.obj("message" := "testMessage1", + "source" := "testSource", + "path" := "testPath1", + "keyword" := "testKeyword1", + "targets" := List("testTarget1") + ), + Json.obj("message" := "testMessage2", + "source" := "testSource", + "path" := "testPath2", + "keyword" := "testKeyword2", + "targets" := List("testTarget2") + ) + ), + schema = schemaKey.some, + data = "testData".asJson.some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + + feWithAtomicSchema must beEqualTo(expectedWithAtomicSchema) + feWithOrdinarySchema must beEqualTo(expectedWithOrdinarySchema) + } + + "convert InvalidSchema correctly" >> { + val sv = Failure.SchemaViolation( + schemaViolation = FailureDetails.SchemaViolation.IgluError( + schemaKey = schemaKey, + error = ClientError.ValidationError( + error = ValidatorError.InvalidSchema( + issues = NonEmptyList.of( + ValidatorError.SchemaIssue(path = "testPath1", message = "testMessage1"), + ValidatorError.SchemaIssue(path = "testPath2", message = "testMessage2") + ) + ), + supersededBy = None + ) + ), + source = "testSource", + data = "testData".asJson + ) + val fe = Failure.fromSchemaViolation(sv, timestamp, processor) + val expected = Failure.FailureContext( + failureType = "ValidationError", + errors = List( + Json.obj( + "message" := "Invalid schema: iglu:com.snowplowanalytics/test/jsonschema/1-0-0 - testMessage1", + "source" := "testSource", + "path" := "testPath1" + ), + Json.obj( + "message" := "Invalid schema: iglu:com.snowplowanalytics/test/jsonschema/1-0-0 - testMessage2", + "source" := "testSource", + "path" := "testPath2" + ) + ), + schema = schemaKey.some, + data = "testData".asJson.some, + timestamp = timestamp, + componentName = processor.artifact, + componentVersion = processor.version + ) + fe must beEqualTo(expected) + } + } +} diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala index 607b3a5c3..38aa5500d 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/utils/IgluUtilsSpec.scala @@ -16,6 +16,8 @@ import org.specs2.matcher.ValidatedMatchers import cats.effect.testing.specs2.CatsEffect import io.circe.parser.parse +import io.circe.Json +import io.circe.syntax._ import cats.data.{Ior, NonEmptyList} @@ -24,7 +26,9 @@ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.iglu.client.ClientError.{ResolutionError, ValidationError} import com.snowplowanalytics.snowplow.badrows._ +import com.snowplowanalytics.snowplow.badrows.FailureDetails +import com.snowplowanalytics.snowplow.enrich.common.enrichments.Failure import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent @@ -42,7 +46,12 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect val processor = Processor("unit tests SCE", "v42") val enriched = new EnrichedEvent() + val unstructFieldName = "unstruct" + val contextsFieldName = "contexts" + val derivedContextsFieldName = "derived_contexts" + val notJson = "foo" + val jsonNotJson = notJson.asJson // Just jsonized version of the string val notIglu = """{"foo":"bar"}""" val unstructSchema = SchemaKey( @@ -142,7 +151,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect } } - "return a SchemaViolation.NotJson if unstruct_event does not contain a properly formatted JSON string" >> { + "return a FailureDetails.SchemaViolation.NotJson if unstruct_event does not contain a properly formatted JSON string" >> { val input = new EnrichedEvent input.setUnstruct_event(notJson) @@ -150,12 +159,20 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.NotJson, _), None) => ok + case Ior.Both( + NonEmptyList( + Failure.SchemaViolation(_: FailureDetails.SchemaViolation.NotJson, `unstructFieldName`, `jsonNotJson`), + _ + ), + None + ) => + ok case other => ko(s"[$other] is not an error with NotJson") } } - "return a SchemaViolation.NotIglu if unstruct_event contains a properly formatted JSON string that is not self-describing" >> { + "return a FailureDetails.SchemaViolation.NotIglu if unstruct_event contains a properly formatted JSON string that is not self-describing" >> { + val json = notIglu.toJson val input = new EnrichedEvent input.setUnstruct_event(notIglu) @@ -163,12 +180,17 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.NotIglu, _), None) => ok + case Ior.Both( + NonEmptyList(Failure.SchemaViolation(_: FailureDetails.SchemaViolation.NotIglu, `unstructFieldName`, `json`), _), + None + ) => + ok case other => ko(s"[$other] is not an error with NotIglu") } } - "return a SchemaViolation.CriterionMismatch if unstruct_event contains a self-describing JSON but not with the expected schema for unstructured events" >> { + "return a FailureDetails.SchemaViolation.CriterionMismatch if unstruct_event contains a self-describing JSON but not with the expected schema for unstructured events" >> { + val json = noSchema.toJson val input = new EnrichedEvent input.setUnstruct_event(noSchema) @@ -176,66 +198,102 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.CriterionMismatch, _), None) => ok + case Ior.Both( + NonEmptyList( + Failure.SchemaViolation(_: FailureDetails.SchemaViolation.CriterionMismatch, `unstructFieldName`, `json`), + _ + ), + None + ) => + ok case other => ko(s"[$other] is not an error with CriterionMismatch") } } - "return a SchemaViolation.NotJson if the JSON in .data is not a JSON" >> { + "return a FailureDetails.SchemaViolation.NotJson if the JSON in .data is not a JSON" >> { val input = new EnrichedEvent - input.setUnstruct_event(buildUnstruct(notJson)) + val ue = buildUnstruct(notJson) + val ueJson = ue.asJson + input.setUnstruct_event(ue) IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.NotJson, _), None) => ok + case Ior.Both(NonEmptyList( + Failure.SchemaViolation(_: FailureDetails.SchemaViolation.NotJson, `unstructFieldName`, `ueJson`), + _ + ), + None + ) => + ok case other => ko(s"[$other] is not an error with NotJson") } } - "return a SchemaViolation.IgluError containing a ValidationError if the JSON in .data is not self-describing" >> { + "return a FailureDetails.SchemaViolation.IgluError containing a ValidationError if the JSON in .data is not self-describing" >> { val input = new EnrichedEvent - input.setUnstruct_event(buildUnstruct(notIglu)) + val ue = buildUnstruct(notIglu) + val ueJson = ue.toJson + input.setUnstruct_event(ue) IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), _), None) => ok - case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, _), None) => - ko(s"IgluError [$ie] is not ValidationError") - case other => ko(s"[$other] is not an error with IgluError") + case Ior.Both(NonEmptyList(Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `unstructFieldName`, + `ueJson` + ), + _ + ), + None + ) => + ok + case other => ko(s"[$other] is not expected one") } } - "return a SchemaViolation.IgluError containing a ValidationError if the JSON in .data is not a valid SDJ" >> { + "return a FailureDetails.SchemaViolation.IgluError containing a ValidationError if the JSON in .data is not a valid SDJ" >> { val input = new EnrichedEvent + val json = invalidEmailSent.toJson input.setUnstruct_event(buildUnstruct(invalidEmailSent)) IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), _), None) => ok - case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, _), None) => - ko(s"IgluError [$ie] is not ValidationError") - case other => ko(s"[$other] is not an error with IgluError") + case Ior.Both(NonEmptyList(Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `unstructFieldName`, + `json` + ), + _ + ), + None + ) => + ok + case other => ko(s"[$other] is not expected one") } } - "return a SchemaViolation.IgluError containing a ResolutionError if the schema of the SDJ in .data can't be resolved" >> { + "return a FailureDetails.SchemaViolation.IgluError containing a ResolutionError if the schema of the SDJ in .data can't be resolved" >> { val input = new EnrichedEvent + val json = noSchema.toJson input.setUnstruct_event(buildUnstruct(noSchema)) IgluUtils .extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_)), _), None) => ok - case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, _), None) => - ko(s"IgluError [$ie] is not a ResolutionError") - case other => ko(s"[$other] is not an error with IgluError") + case Ior.Both( + NonEmptyList( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), `unstructFieldName`, `json`), + _ + ), + None + ) => + ok + case other => ko(s"[$other] is not expected one") } } @@ -307,7 +365,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect } } - "return a SchemaViolation.NotJson if .contexts does not contain a properly formatted JSON string" >> { + "return a FailureDetails.SchemaViolation.NotJson if .contexts does not contain a properly formatted JSON string" >> { val input = new EnrichedEvent input.setContexts(notJson) @@ -315,89 +373,122 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.NotJson, Nil), Nil) => ok + case Ior.Both( + NonEmptyList(Failure.SchemaViolation(_: FailureDetails.SchemaViolation.NotJson, `contextsFieldName`, `jsonNotJson`), Nil), + Nil + ) => + ok case other => ko(s"[$other] is not an error with NotJson") } } - "return a SchemaViolation.NotIglu if .contexts contains a properly formatted JSON string that is not self-describing" >> { + "return a FailureDetails.SchemaViolation.NotIglu if .contexts contains a properly formatted JSON string that is not self-describing" >> { val input = new EnrichedEvent + val json = notIglu.toJson input.setContexts(notIglu) IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.NotIglu, Nil), Nil) => ok + case Ior.Both( + NonEmptyList(Failure.SchemaViolation(_: FailureDetails.SchemaViolation.NotIglu, `contextsFieldName`, `json`), Nil), + Nil + ) => + ok case other => ko(s"[$other] is not an error with NotIglu") } } - "return a SchemaViolation.CriterionMismatch if .contexts contains a self-describing JSON but not with the right schema" >> { + "return a FailureDetails.SchemaViolation.CriterionMismatch if .contexts contains a self-describing JSON but not with the right schema" >> { val input = new EnrichedEvent + val json = noSchema.toJson input.setContexts(noSchema) IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.CriterionMismatch, Nil), Nil) => ok + case Ior.Both(NonEmptyList( + Failure.SchemaViolation(_: FailureDetails.SchemaViolation.CriterionMismatch, `contextsFieldName`, `json`), + Nil + ), + Nil + ) => + ok case other => ko(s"[$other] is not an error with CriterionMismatch") } } - "return a SchemaViolation.IgluError containing a ValidationError if .data does not contain an array of JSON objects" >> { + "return a FailureDetails.SchemaViolation.IgluError containing a ValidationError if .data does not contain an array of JSON objects" >> { val input = new EnrichedEvent val notArrayContexts = s"""{"schema": "${inputContextsSchema.toSchemaUri}", "data": ${emailSent1}}""" + val json = notArrayContexts.toJson input.setContexts(notArrayContexts) IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), Nil), Nil) => + case Ior.Both( + NonEmptyList( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), `contextsFieldName`, `json`), + Nil + ), + Nil + ) => ok - case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, Nil), Nil) => - ko(s"IgluError [$ie] is not ValidationError") - case other => ko(s"[$other] is not an error with IgluError") + case other => ko(s"[$other] is not expected one") } } - "return a SchemaViolation.IgluError containing a ValidationError if .data contains one invalid context" >> { + "return a FailureDetails.SchemaViolation.IgluError containing a ValidationError if .data contains one invalid context" >> { val input = new EnrichedEvent + val json = invalidEmailSent.toJson input.setContexts(buildInputContexts(List(invalidEmailSent))) IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), Nil), Nil) => + case Ior.Both( + NonEmptyList( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), `contextsFieldName`, `json`), + Nil + ), + Nil + ) => ok - case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, Nil), Nil) => - ko(s"IgluError [$ie] is not ValidationError") - case other => ko(s"[$other] is not an error with IgluError") + case other => ko(s"[$other] is not expected one") } } - "return a SchemaViolation.IgluError containing a ResolutionError if .data contains one context whose schema can't be resolved" >> { + "return a FailureDetails.SchemaViolation.IgluError containing a ResolutionError if .data contains one context whose schema can't be resolved" >> { val input = new EnrichedEvent + val json = noSchema.toJson input.setContexts(buildInputContexts(List(noSchema))) IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_)), Nil), Nil) => + case Ior.Both( + NonEmptyList( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), `contextsFieldName`, `json`), + Nil + ), + Nil + ) => ok - case Ior.Both(NonEmptyList(ie: FailureDetails.SchemaViolation.IgluError, Nil), Nil) => - ko(s"IgluError [$ie] is not ResolutionError") - case other => ko(s"[$other] is not an error with IgluError") + case other => ko(s"[$other] is not expected one") } } "return 2 expected failures for 2 invalid contexts" >> { val input = new EnrichedEvent + val invalidEmailSentJson = invalidEmailSent.toJson + val noSchemaJson = noSchema.toJson input.setContexts(buildInputContexts(List(invalidEmailSent, noSchema))) IgluUtils @@ -405,8 +496,16 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both(NonEmptyList( - FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), - List(FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_))) + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `contextsFieldName`, + `invalidEmailSentJson` + ), + List( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), + `contextsFieldName`, + `noSchemaJson` + ) + ) ), Nil ) => @@ -417,14 +516,19 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect "return an expected failure and an expected SDJ if one context is invalid and one is valid" >> { val input = new EnrichedEvent + val noSchemaJson = noSchema.toJson input.setContexts(buildInputContexts(List(emailSent1, noSchema))) IgluUtils .extractAndValidateInputContexts(input, SpecHelpers.client, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(_: FailureDetails.SchemaViolation.IgluError, Nil), List(extract)) - if extract.sdj.schema == emailSentSchema => + case Ior.Both(NonEmptyList( + Failure.SchemaViolation(_: FailureDetails.SchemaViolation.IgluError, `contextsFieldName`, `noSchemaJson`), + Nil + ), + List(extract) + ) if extract.sdj.schema == emailSentSchema => ok case other => ko(s"[$other] is not one IgluError and one SDJ with schema $emailSentSchema") } @@ -478,6 +582,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect "validateEnrichmentsContexts" should { "return one expected SchemaViolation for one invalid context" >> { + val json = invalidEmailSent.toJson val contexts = List( SpecHelpers.jsonStringToSDJ(invalidEmailSent).right.get ) @@ -486,12 +591,23 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .validateEnrichmentsContexts(SpecHelpers.client, contexts, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), Nil), Nil) => ok + case Ior.Both( + NonEmptyList(Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `derivedContextsFieldName`, + `json` + ), + Nil + ), + Nil + ) => + ok case other => ko(s"[$other] is not one ValidationError") } } "return two expected SchemaViolation for two invalid contexts" >> { + val invalidEmailSentJson = invalidEmailSent.toJson + val noSchemaJson = noSchema.toJson val contexts = List( SpecHelpers.jsonStringToSDJ(invalidEmailSent).right.get, SpecHelpers.jsonStringToSDJ(noSchema).right.get @@ -501,8 +617,17 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .validateEnrichmentsContexts(SpecHelpers.client, contexts, SpecHelpers.registryLookup) .value .map { - case Ior.Both(NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), - List(FailureDetails.SchemaViolation.IgluError(_, ResolutionError(_))) + case Ior.Both(NonEmptyList( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `derivedContextsFieldName`, + `invalidEmailSentJson` + ), + List( + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ResolutionError), + `derivedContextsFieldName`, + `noSchemaJson` + ) + ) ), Nil ) => @@ -512,6 +637,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect } "return one expected SchemaViolation for one invalid context and one valid" >> { + val invalidEmailSentJson = invalidEmailSent.toJson val contexts = List( SpecHelpers.jsonStringToSDJ(invalidEmailSent).right.get, SpecHelpers.jsonStringToSDJ(emailSent1).right.get @@ -522,7 +648,10 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both(NonEmptyList( - FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), + Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `derivedContextsFieldName`, + `invalidEmailSentJson` + ), Nil ), List(sdj) @@ -563,7 +692,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .map { case Ior.Both( NonEmptyList( - _: FailureDetails.SchemaViolation, + _: Failure.SchemaViolation, Nil ), IgluUtils.EventExtractResult(Nil, None, Nil) @@ -587,7 +716,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .map { case Ior.Both( NonEmptyList( - _: FailureDetails.SchemaViolation, + _: Failure.SchemaViolation, Nil ), IgluUtils.EventExtractResult(Nil, None, Nil) @@ -612,8 +741,8 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .map { case Ior.Both( NonEmptyList( - _: FailureDetails.SchemaViolation, - List(_: FailureDetails.SchemaViolation) + _: Failure.SchemaViolation, + List(_: Failure.SchemaViolation) ), IgluUtils.EventExtractResult(Nil, None, Nil) ) => @@ -649,6 +778,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect "return the SchemaViolation of the invalid context in the Left and the extracted unstructured event in the Right" >> { val input = new EnrichedEvent + val invalidEmailSentJson = invalidEmailSent.toJson input.setUnstruct_event(buildUnstruct(emailSent1)) input.setContexts(buildInputContexts(List(invalidEmailSent))) @@ -661,7 +791,12 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both( - NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), _), + NonEmptyList(Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `contextsFieldName`, + `invalidEmailSentJson` + ), + _ + ), extract ) if extract.contexts.isEmpty && extract.unstructEvent.isDefined && extract.unstructEvent.get.schema == emailSentSchema => ok @@ -674,6 +809,7 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect "return the SchemaViolation of the invalid unstructured event in the Left and the valid context in the Right" >> { val input = new EnrichedEvent + val invalidEmailSentJson = invalidEmailSent.toJson input.setUnstruct_event(buildUnstruct(invalidEmailSent)) input.setContexts(buildInputContexts(List(emailSent1))) @@ -686,7 +822,12 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect .value .map { case Ior.Both( - NonEmptyList(FailureDetails.SchemaViolation.IgluError(_, ValidationError(_, _)), _), + NonEmptyList(Failure.SchemaViolation(FailureDetails.SchemaViolation.IgluError(_, _: ValidationError), + `unstructFieldName`, + `invalidEmailSentJson` + ), + _ + ), extract ) if extract.contexts.size == 1 && extract.contexts.head.schema == emailSentSchema && extract.unstructEvent.isEmpty => ok @@ -702,12 +843,11 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect input.setUnstruct_event(buildUnstruct(supersedingExample1)) input.setContexts(buildInputContexts(List(supersedingExample1, supersedingExample2))) - val expectedValidationInfoContext = parse( + val expectedValidationInfoContext = """ { | "originalSchema" : "iglu:com.acme/superseding_example/jsonschema/1-0-0", | "validatedWith" : "1-0-1" - |}""".stripMargin - ).toOption.get + |}""".stripMargin.toJson IgluUtils .extractAndValidateInputJsons( @@ -737,4 +877,8 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect def buildInputContexts(sdjs: List[String] = List.empty[String]) = s"""{"schema": "${inputContextsSchema.toSchemaUri}", "data": [${sdjs.mkString(",")}]}""" + + implicit class StringToJson(str: String) { + def toJson: Json = parse(str).toOption.get + } }