Skip to content

Commit

Permalink
Address Ben's comments - 2
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Apr 3, 2024
1 parent d1eb35e commit 247542f
Show file tree
Hide file tree
Showing 10 changed files with 428 additions and 518 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package com.snowplowanalytics.snowplow.enrich.common.enrichments

import cats.data.NonEmptyList

import io.circe.Json
import io.circe.syntax._

import com.snowplowanalytics.snowplow.badrows.FailureDetails
Expand Down Expand Up @@ -136,12 +135,12 @@ object AtomicFields {
AtomicFields(withLimits)
}

def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): FailureEntity.SchemaViolation = {
def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): Failure.SchemaViolation = {
val clientError = ValidationError(ValidatorError.InvalidData(errors), None)

val failureData = Json.obj(errors.toList.flatMap(e => e.path.map(p => p := e.keyword)): _*)
val failureData = errors.toList.flatMap(e => e.path.map(p => p := e.keyword)).toMap.asJson

FailureEntity.SchemaViolation(
Failure.SchemaViolation(
schemaViolation = FailureDetails.SchemaViolation.IgluError(
AtomicFields.atomicSchema,
clientError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object AtomicFieldsLengthValidator {
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): IorT[F, FailureEntity.SchemaViolation, Unit] =
): IorT[F, Failure.SchemaViolation, Unit] =
IorT {
atomicFields.value
.map(validateField(event, _).toValidatedNel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.badrows._
import com.snowplowanalytics.snowplow.badrows.{FailureDetails, Payload, Processor}
import com.snowplowanalytics.snowplow.badrows.{Failure => BadRowFailure}

import com.snowplowanalytics.snowplow.enrich.common.{EtlPipeline, QueryStringParameters, RawEventParameters}
import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent
Expand All @@ -46,18 +46,6 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.{IgluUtils, Conversion

object EnrichmentManager {

private type EnrichmentResult[F[_]] =
IorT[F, NonEmptyList[IntermediateBadRow], (EnrichedEvent, List[SelfDescribingData[Json]])]

// We need this intermediate representation because we have to create partially enriched event
// right after an enrichment/validation step completed. If we don't do it like that and
// create partially enriched event in the end instead, we might get partially enriched event
// updated in the later steps.
private case class IntermediateBadRow(
failureEntities: NonEmptyList[FailureEntity],
partiallyEnrichedEvent: Payload.PartiallyEnrichedEvent
)

/**
* Run the enrichment workflow
* @param registry Contain configuration for all enrichments to apply
Expand All @@ -83,96 +71,113 @@ object EnrichmentManager {
atomicFields: AtomicFields,
emitIncomplete: Boolean
): IorT[F, BadRow, EnrichedEvent] = {
val iorT: EnrichmentResult[F] = for {
enriched <- IorT.pure[F, NonEmptyList[IntermediateBadRow]](new EnrichedEvent)
extractResult <- mapAndValidateInput(
raw,
enriched,
etlTstamp,
processor,
client,
registryLookup
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
// Next 2 lines remove the invalid contexts and the invalid unstructured event from the event.
// This should be done after the bad row was created and only if emitIncomplete is enabled.
_ = {
enriched.contexts = ME.formatContexts(extractResult.contexts).orNull
enriched.unstruct_event = ME.formatUnstructEvent(extractResult.unstructEvent).orNull
}
enrichmentsContexts <- runEnrichments(
registry,
raw,
enriched,
extractResult.contexts,
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
validContexts <- validateEnriched(
enriched,
enrichmentsContexts,
client,
registryLookup,
featureFlags.acceptInvalid,
invalidCount,
atomicFields
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
} yield (enriched, validContexts ::: extractResult.validationInfoContexts)
def enrich(enriched: EnrichedEvent): IorT[F, NonEmptyList[Failure], List[SelfDescribingData[Json]]] =
for {
extractResult <- mapAndValidateInput(
raw,
enriched,
etlTstamp,
processor,
client,
registryLookup
)
.possiblyExitingEarly(emitIncomplete)
// Next 2 lines remove the invalid contexts and the invalid unstructured event from the event.
// This should be done after the bad row was created and only if emitIncomplete is enabled.
_ = {
enriched.contexts = ME.formatContexts(extractResult.contexts).orNull
enriched.unstruct_event = ME.formatUnstructEvent(extractResult.unstructEvent).orNull
}
enrichmentsContexts <- runEnrichments(
registry,
raw,
enriched,
extractResult.contexts,
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder
)
.possiblyExitingEarly(emitIncomplete)
validContexts <- validateEnriched(
enriched,
enrichmentsContexts,
client,
registryLookup,
featureFlags.acceptInvalid,
invalidCount,
atomicFields
)
.possiblyExitingEarly(emitIncomplete)
derivedContexts = validContexts ::: extractResult.validationInfoContexts
} yield derivedContexts

// derived contexts are set lastly because we want to include failure entities
// to derived contexts as well and we can get failure entities only in the end
// of the enrichment process
setDerivedContexts(iorT, processor)
.leftMap(createBadRow(_, RawEvent.toRawEvent(raw), processor))
.map(_._1)
IorT(
for {
enrichedEvent <- Sync[F].delay(new EnrichedEvent)
enrichmentResult <- enrich(enrichedEvent).value
now = Instant.now()
_ = setDerivedContexts(enrichedEvent, enrichmentResult, processor, now)
result = enrichmentResult
.leftMap { fe =>
createBadRow(
fe,
EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent),
RawEvent.toRawEvent(raw),
processor,
now
)
}
.map(_ => enrichedEvent)
} yield result
)
}

private def createBadRow(
fe: NonEmptyList[IntermediateBadRow],
fe: NonEmptyList[Failure],
pe: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
processor: Processor
): BadRow = {
val intermediateBadRow = fe.head
intermediateBadRow.failureEntities.head match {
case h: FailureEntity.SchemaViolation =>
val sv = intermediateBadRow.failureEntities.tail.collect { case f: FailureEntity.SchemaViolation => f }
buildSchemaViolationsBadRow(NonEmptyList(h, sv), intermediateBadRow.partiallyEnrichedEvent, re, processor)
case h: FailureEntity.EnrichmentFailure =>
val ef = intermediateBadRow.failureEntities.tail.collect { case f: FailureEntity.EnrichmentFailure => f }
buildEnrichmentFailuresBadRow(NonEmptyList(h, ef), intermediateBadRow.partiallyEnrichedEvent, re, processor)
processor: Processor,
timestamp: Instant
): BadRow =
fe.head match {
case h: Failure.SchemaViolation =>
val sv = fe.tail.collect { case f: Failure.SchemaViolation => f }
BadRow.SchemaViolations(
processor,
BadRowFailure.SchemaViolations(timestamp, NonEmptyList(h, sv).map(_.schemaViolation)),
Payload.EnrichmentPayload(pe, re)
)
case h: Failure.EnrichmentFailure =>
val ef = fe.tail.collect { case f: Failure.EnrichmentFailure => f }
BadRow.EnrichmentFailures(
processor,
BadRowFailure.EnrichmentFailures(timestamp, NonEmptyList(h, ef).map(_.enrichmentFailure)),
Payload.EnrichmentPayload(pe, re)
)
}
}

private def setDerivedContexts[F[_]: Sync](enriched: EnrichmentResult[F], processor: Processor): EnrichmentResult[F] =
IorT(
enriched.value.flatTap(v =>
Sync[F].delay {
val now = Instant.now()
val (derivedContexts, enriched) = v match {
case Ior.Right((e, l)) => (l, e.some)
case Ior.Left(l) => (convertFailureEntitiesToSDJ(l, now, processor), None)
case Ior.Both(b, (e, l)) => (l ::: convertFailureEntitiesToSDJ(b, now, processor), e.some)
}
for {
c <- ME.formatContexts(derivedContexts)
e <- enriched
_ = e.derived_contexts = c
} yield ()
}
)
private def setDerivedContexts(
enriched: EnrichedEvent,
enrichmentResult: Ior[NonEmptyList[Failure], List[SelfDescribingData[Json]]],
processor: Processor,
timestamp: Instant
): Unit = {
val derivedContexts = enrichmentResult.fold(
l => convertFailureEntitiesToSDJ(l, timestamp, processor),
identity,
{ case (b, l) => l ::: convertFailureEntitiesToSDJ(b, timestamp, processor) }
)
ME.formatContexts(derivedContexts).foreach(c => enriched.derived_contexts = c)
}

private def convertFailureEntitiesToSDJ(
l: NonEmptyList[IntermediateBadRow],
l: NonEmptyList[Failure],
timestamp: Instant,
processor: Processor
): List[SelfDescribingData[Json]] =
l.flatMap(_.failureEntities).map(FailureEntity.toSDJ(_, timestamp, processor)).toList
l.map(_.toSDJ(timestamp, processor)).toList

private def mapAndValidateInput[F[_]: Sync](
raw: RawEvent,
Expand All @@ -181,18 +186,15 @@ object EnrichmentManager {
processor: Processor,
client: IgluCirceClient[F],
registryLookup: RegistryLookup[F]
): IorT[F, IntermediateBadRow, IgluUtils.EventExtractResult] = {
val iorT = for {
): IorT[F, NonEmptyList[Failure], IgluUtils.EventExtractResult] =
for {
_ <- setupEnrichedEvent[F](raw, enrichedEvent, etlTstamp, processor)
.leftMap(NonEmptyList.one)
extract <- IgluUtils
.extractAndValidateInputJsons(enrichedEvent, client, registryLookup)
.leftMap { l: NonEmptyList[FailureEntity] => l }
.leftMap { l: NonEmptyList[Failure] => l }
} yield extract

iorT.leftMap(v => IntermediateBadRow(v, EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent)))
}

/**
* Run all the enrichments
* @param enriched /!\ MUTABLE enriched event, mutated IN-PLACE /!\
Expand All @@ -206,7 +208,7 @@ object EnrichmentManager {
inputContexts: List[SelfDescribingData[Json]],
unstructEvent: Option[SelfDescribingData[Json]],
legacyOrder: Boolean
): IorT[F, IntermediateBadRow, List[SelfDescribingData[Json]]] =
): IorT[F, NonEmptyList[Failure], List[SelfDescribingData[Json]]] =
IorT {
accState(registry, raw, inputContexts, unstructEvent, legacyOrder)
.runS(Accumulation(enriched, Nil, Nil))
Expand All @@ -215,7 +217,7 @@ object EnrichmentManager {
failures.toNel match {
case Some(nel) =>
Ior.both(
IntermediateBadRow(nel.map(FailureEntity.EnrichmentFailure), EnrichedEvent.toPartiallyEnrichedEvent(enriched)),
nel.map(Failure.EnrichmentFailure),
contexts
)
case None =>
Expand All @@ -232,17 +234,14 @@ object EnrichmentManager {
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): IorT[F, IntermediateBadRow, List[SelfDescribingData[Json]]] = {
val iorT = for {
): IorT[F, NonEmptyList[Failure], List[SelfDescribingData[Json]]] =
for {
validContexts <- IgluUtils.validateEnrichmentsContexts[F](client, enrichmentsContexts, registryLookup)
_ <- AtomicFieldsLengthValidator
.validate[F](enriched, acceptInvalid, invalidCount, atomicFields)
.leftMap { v: FailureEntity => NonEmptyList.one(v) }
.leftMap { v: Failure => NonEmptyList.one(v) }
} yield validContexts

iorT.leftMap(v => IntermediateBadRow(v, EnrichedEvent.toPartiallyEnrichedEvent(enriched)))
}

private[enrichments] case class Accumulation(
event: EnrichedEvent,
errors: List[FailureDetails.EnrichmentFailure],
Expand Down Expand Up @@ -369,7 +368,7 @@ object EnrichmentManager {
e: EnrichedEvent,
etlTstamp: DateTime,
processor: Processor
): IorT[F, FailureEntity.SchemaViolation, Unit] =
): IorT[F, Failure.SchemaViolation, Unit] =
IorT {
Sync[F].delay {
e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter
Expand Down Expand Up @@ -880,34 +879,6 @@ object EnrichmentManager {
}
}

private def buildSchemaViolationsBadRow(
fe: NonEmptyList[FailureEntity.SchemaViolation],
pee: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
processor: Processor
): BadRow = {
val now = Instant.now()
BadRow.SchemaViolations(
processor,
Failure.SchemaViolations(now, fe.map(_.schemaViolation)),
Payload.EnrichmentPayload(pee, re)
)
}

private def buildEnrichmentFailuresBadRow(
fe: NonEmptyList[FailureEntity.EnrichmentFailure],
pee: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
processor: Processor
): BadRow = {
val now = Instant.now()
BadRow.EnrichmentFailures(
processor,
Failure.EnrichmentFailures(now, fe.map(_.enrichmentFailure)),
Payload.EnrichmentPayload(pee, re)
)
}

private implicit class IorTOps[F[_], A, B](val iorT: IorT[F, A, B]) extends AnyVal {

/** If the incomplete events feature is disabled, then convert a Both to a Left, so we don't waste time with next steps */
Expand Down
Loading

0 comments on commit 247542f

Please sign in to comment.