Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failure entity for incomplete events #886

Merged
merged 2 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package com.snowplowanalytics.snowplow.enrich.common.enrichments

import cats.data.NonEmptyList

import io.circe.syntax._

import com.snowplowanalytics.snowplow.badrows.FailureDetails

import com.snowplowanalytics.iglu.client.ClientError.ValidationError
Expand Down Expand Up @@ -133,12 +135,19 @@ object AtomicFields {
AtomicFields(withLimits)
}

def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): FailureDetails.SchemaViolation = {
def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): Failure.SchemaViolation = {
val clientError = ValidationError(ValidatorError.InvalidData(errors), None)

FailureDetails.SchemaViolation.IgluError(
AtomicFields.atomicSchema,
clientError
val failureData = errors.toList.flatMap(e => e.path.map(p => p := e.keyword)).toMap.asJson

Failure.SchemaViolation(
schemaViolation = FailureDetails.SchemaViolation.IgluError(
AtomicFields.atomicSchema,
clientError
),
// Source atomic field and actual value of the field should be already on the ValidatorReport list
source = "atomic_field",
data = failureData
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import cats.implicits._

import com.snowplowanalytics.iglu.client.validator.ValidatorReport

import com.snowplowanalytics.snowplow.badrows.FailureDetails

import com.snowplowanalytics.snowplow.enrich.common.enrichments.AtomicFields.LimitedAtomicField
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent

Expand All @@ -37,7 +35,7 @@ object AtomicFieldsLengthValidator {
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): IorT[F, FailureDetails.SchemaViolation, Unit] =
): IorT[F, Failure.SchemaViolation, Unit] =
IorT {
atomicFields.value
.map(validateField(event, _).toValidatedNel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.badrows._
import com.snowplowanalytics.snowplow.badrows.{FailureDetails, Payload, Processor}
import com.snowplowanalytics.snowplow.badrows.{Failure => BadRowFailure}

import com.snowplowanalytics.snowplow.enrich.common.{EtlPipeline, QueryStringParameters, RawEventParameters}
import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent
Expand Down Expand Up @@ -71,52 +71,109 @@ object EnrichmentManager {
atomicFields: AtomicFields,
emitIncomplete: Boolean
): IorT[F, BadRow, EnrichedEvent] = {
val iorT: IorT[F, NonEmptyList[BadRow], EnrichedEvent] = for {
enriched <- IorT.pure[F, NonEmptyList[BadRow]](new EnrichedEvent)
extractResult <- mapAndValidateInput(
raw,
enriched,
etlTstamp,
processor,
client,
registryLookup
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
// Next 2 lines remove the invalid contexts and the invalid unstructured event from the event.
// This should be done after the bad row was created and only if emitIncomplete is enabled.
_ = {
enriched.contexts = ME.formatContexts(extractResult.contexts).orNull
enriched.unstruct_event = ME.formatUnstructEvent(extractResult.unstructEvent).orNull
}
enrichmentsContexts <- runEnrichments(
registry,
processor,
raw,
enriched,
extractResult.contexts,
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
_ <- validateEnriched(
enriched,
raw,
enrichmentsContexts,
extractResult.validationInfoContexts,
client,
processor,
registryLookup,
featureFlags.acceptInvalid,
invalidCount,
atomicFields
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
} yield enriched
def enrich(enriched: EnrichedEvent): IorT[F, NonEmptyList[NonEmptyList[Failure]], List[SelfDescribingData[Json]]] =
for {
extractResult <- mapAndValidateInput(
raw,
enriched,
etlTstamp,
processor,
client,
registryLookup
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
// Next 2 lines remove the invalid contexts and the invalid unstructured event from the event.
// This should be done after the bad row was created and only if emitIncomplete is enabled.
_ = {
enriched.contexts = ME.formatContexts(extractResult.contexts).orNull
enriched.unstruct_event = ME.formatUnstructEvent(extractResult.unstructEvent).orNull
}
enrichmentsContexts <- runEnrichments(
registry,
raw,
enriched,
extractResult.contexts,
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
validContexts <- validateEnriched(
enriched,
enrichmentsContexts,
client,
registryLookup,
featureFlags.acceptInvalid,
invalidCount,
atomicFields
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
derivedContexts = validContexts ::: extractResult.validationInfoContexts
} yield derivedContexts

// derived contexts are set lastly because we want to include failure entities
// to derived contexts as well and we can get failure entities only in the end
// of the enrichment process
IorT(
for {
enrichedEvent <- Sync[F].delay(new EnrichedEvent)
enrichmentResult <- enrich(enrichedEvent).value
now = Instant.now()
_ = setDerivedContexts(enrichedEvent, enrichmentResult, now, processor)
result = enrichmentResult
.leftMap { fe =>
createBadRow(
fe,
EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent),
RawEvent.toRawEvent(raw),
now,
processor
)
}
.map(_ => enrichedEvent)
} yield result
)
}

private def createBadRow(
fe: NonEmptyList[NonEmptyList[Failure]],
pe: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
timestamp: Instant,
processor: Processor
): BadRow = {
val firstList = fe.head
firstList.head match {
case h: Failure.SchemaViolation =>
val sv = firstList.tail.collect { case f: Failure.SchemaViolation => f }
BadRow.SchemaViolations(
processor,
BadRowFailure.SchemaViolations(timestamp, NonEmptyList(h, sv).map(_.schemaViolation)),
Payload.EnrichmentPayload(pe, re)
)
case h: Failure.EnrichmentFailure =>
val ef = firstList.tail.collect { case f: Failure.EnrichmentFailure => f }
BadRow.EnrichmentFailures(
processor,
BadRowFailure.EnrichmentFailures(timestamp, NonEmptyList(h, ef).map(_.enrichmentFailure)),
Payload.EnrichmentPayload(pe, re)
)
}
}

iorT.leftMap(_.head)
def setDerivedContexts(
enriched: EnrichedEvent,
enrichmentResult: Ior[NonEmptyList[NonEmptyList[Failure]], List[SelfDescribingData[Json]]],
timestamp: Instant,
processor: Processor
): Unit = {
val derivedContexts = enrichmentResult.leftMap { ll =>
ll.flatten.toList
.map(_.toSDJ(timestamp, processor))
}.merge
Comment on lines +172 to +175
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice !

ME.formatContexts(derivedContexts).foreach(c => enriched.derived_contexts = c)
}

private def mapAndValidateInput[F[_]: Sync](
Expand All @@ -126,23 +183,15 @@ object EnrichmentManager {
processor: Processor,
client: IgluCirceClient[F],
registryLookup: RegistryLookup[F]
): IorT[F, BadRow, IgluUtils.EventExtractResult] = {
val iorT = for {
): IorT[F, NonEmptyList[Failure], IgluUtils.EventExtractResult] =
for {
_ <- setupEnrichedEvent[F](raw, enrichedEvent, etlTstamp, processor)
.leftMap(NonEmptyList.one)
extract <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, registryLookup)
extract <- IgluUtils
.extractAndValidateInputJsons(enrichedEvent, client, registryLookup)
.leftMap { l: NonEmptyList[Failure] => l }
} yield extract

iorT.leftMap { violations =>
buildSchemaViolationsBadRow(
violations,
EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent),
RawEvent.toRawEvent(raw),
processor
)
}
}

/**
* Run all the enrichments
* @param enriched /!\ MUTABLE enriched event, mutated IN-PLACE /!\
Expand All @@ -151,13 +200,12 @@ object EnrichmentManager {
*/
private def runEnrichments[F[_]: Monad](
registry: EnrichmentRegistry[F],
processor: Processor,
raw: RawEvent,
enriched: EnrichedEvent,
inputContexts: List[SelfDescribingData[Json]],
unstructEvent: Option[SelfDescribingData[Json]],
legacyOrder: Boolean
): IorT[F, BadRow, List[SelfDescribingData[Json]]] =
): IorT[F, NonEmptyList[Failure], List[SelfDescribingData[Json]]] =
IorT {
accState(registry, raw, inputContexts, unstructEvent, legacyOrder)
.runS(Accumulation(enriched, Nil, Nil))
Expand All @@ -166,12 +214,7 @@ object EnrichmentManager {
failures.toNel match {
case Some(nel) =>
Ior.both(
buildEnrichmentFailuresBadRow(
nel,
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
),
nel.map(Failure.EnrichmentFailure),
contexts
)
case None =>
Expand All @@ -182,33 +225,19 @@ object EnrichmentManager {

private def validateEnriched[F[_]: Clock: Monad](
enriched: EnrichedEvent,
raw: RawEvent,
enrichmentsContexts: List[SelfDescribingData[Json]],
validationInfoContexts: List[SelfDescribingData[Json]],
client: IgluCirceClient[F],
processor: Processor,
registryLookup: RegistryLookup[F],
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): IorT[F, BadRow, Unit] = {
val iorT = for {
): IorT[F, NonEmptyList[Failure], List[SelfDescribingData[Json]]] =
for {
validContexts <- IgluUtils.validateEnrichmentsContexts[F](client, enrichmentsContexts, registryLookup)
_ = ME.formatContexts(validContexts ::: validationInfoContexts).foreach(enriched.derived_contexts = _)
_ <- AtomicFieldsLengthValidator
.validate[F](enriched, acceptInvalid, invalidCount, atomicFields)
.leftMap(NonEmptyList.one)
} yield ()

iorT.leftMap { violations =>
buildSchemaViolationsBadRow(
violations,
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
)
}
}
.leftMap { v: Failure => NonEmptyList.one(v) }
} yield validContexts

private[enrichments] case class Accumulation(
event: EnrichedEvent,
Expand Down Expand Up @@ -336,7 +365,7 @@ object EnrichmentManager {
e: EnrichedEvent,
etlTstamp: DateTime,
processor: Processor
): IorT[F, FailureDetails.SchemaViolation, Unit] =
): IorT[F, Failure.SchemaViolation, Unit] =
IorT {
Sync[F].delay {
e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter
Expand Down Expand Up @@ -847,30 +876,6 @@ object EnrichmentManager {
}
}

private def buildSchemaViolationsBadRow(
vs: NonEmptyList[FailureDetails.SchemaViolation],
pee: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
processor: Processor
): BadRow.SchemaViolations =
BadRow.SchemaViolations(
processor,
Failure.SchemaViolations(Instant.now(), vs),
Payload.EnrichmentPayload(pee, re)
)

private def buildEnrichmentFailuresBadRow(
fs: NonEmptyList[FailureDetails.EnrichmentFailure],
pee: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
processor: Processor
) =
BadRow.EnrichmentFailures(
processor,
Failure.EnrichmentFailures(Instant.now(), fs),
Payload.EnrichmentPayload(pee, re)
)

private implicit class IorTOps[F[_], A, B](val iorT: IorT[F, A, B]) extends AnyVal {

/** If the incomplete events feature is disabled, then convert a Both to a Left, so we don't waste time with next steps */
Expand Down
Loading
Loading