Skip to content

Commit

Permalink
Address Ben's comments - 3
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Apr 4, 2024
1 parent 247542f commit 23b1237
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object EnrichmentManager {
atomicFields: AtomicFields,
emitIncomplete: Boolean
): IorT[F, BadRow, EnrichedEvent] = {
def enrich(enriched: EnrichedEvent): IorT[F, NonEmptyList[Failure], List[SelfDescribingData[Json]]] =
def enrich(enriched: EnrichedEvent): IorT[F, NonEmptyList[NonEmptyList[Failure]], List[SelfDescribingData[Json]]] =
for {
extractResult <- mapAndValidateInput(
raw,
Expand All @@ -81,6 +81,7 @@ object EnrichmentManager {
client,
registryLookup
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
// Next 2 lines remove the invalid contexts and the invalid unstructured event from the event.
// This should be done after the bad row was created and only if emitIncomplete is enabled.
Expand All @@ -96,6 +97,7 @@ object EnrichmentManager {
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
validContexts <- validateEnriched(
enriched,
Expand All @@ -106,6 +108,7 @@ object EnrichmentManager {
invalidCount,
atomicFields
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
derivedContexts = validContexts ::: extractResult.validationInfoContexts
} yield derivedContexts
Expand All @@ -118,15 +121,15 @@ object EnrichmentManager {
enrichedEvent <- Sync[F].delay(new EnrichedEvent)
enrichmentResult <- enrich(enrichedEvent).value
now = Instant.now()
_ = setDerivedContexts(enrichedEvent, enrichmentResult, processor, now)
_ = setDerivedContexts(enrichedEvent, enrichmentResult, now, processor)
result = enrichmentResult
.leftMap { fe =>
createBadRow(
fe,
EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent),
RawEvent.toRawEvent(raw),
processor,
now
now,
processor
)
}
.map(_ => enrichedEvent)
Expand All @@ -135,50 +138,44 @@ object EnrichmentManager {
}

private def createBadRow(
fe: NonEmptyList[Failure],
fe: NonEmptyList[NonEmptyList[Failure]],
pe: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
processor: Processor,
timestamp: Instant
): BadRow =
fe.head match {
timestamp: Instant,
processor: Processor
): BadRow = {
val firstList = fe.head
firstList.head match {
case h: Failure.SchemaViolation =>
val sv = fe.tail.collect { case f: Failure.SchemaViolation => f }
val sv = firstList.tail.collect { case f: Failure.SchemaViolation => f }
BadRow.SchemaViolations(
processor,
BadRowFailure.SchemaViolations(timestamp, NonEmptyList(h, sv).map(_.schemaViolation)),
Payload.EnrichmentPayload(pe, re)
)
case h: Failure.EnrichmentFailure =>
val ef = fe.tail.collect { case f: Failure.EnrichmentFailure => f }
val ef = firstList.tail.collect { case f: Failure.EnrichmentFailure => f }
BadRow.EnrichmentFailures(
processor,
BadRowFailure.EnrichmentFailures(timestamp, NonEmptyList(h, ef).map(_.enrichmentFailure)),
Payload.EnrichmentPayload(pe, re)
)
}
}

private def setDerivedContexts(
def setDerivedContexts(
enriched: EnrichedEvent,
enrichmentResult: Ior[NonEmptyList[Failure], List[SelfDescribingData[Json]]],
processor: Processor,
timestamp: Instant
enrichmentResult: Ior[NonEmptyList[NonEmptyList[Failure]], List[SelfDescribingData[Json]]],
timestamp: Instant,
processor: Processor
): Unit = {
val derivedContexts = enrichmentResult.fold(
l => convertFailureEntitiesToSDJ(l, timestamp, processor),
identity,
{ case (b, l) => l ::: convertFailureEntitiesToSDJ(b, timestamp, processor) }
)
val derivedContexts = enrichmentResult.leftMap { ll =>
ll.flatten.toList
.map(_.toSDJ(timestamp, processor))
}.merge
ME.formatContexts(derivedContexts).foreach(c => enriched.derived_contexts = c)
}

private def convertFailureEntitiesToSDJ(
l: NonEmptyList[Failure],
timestamp: Instant,
processor: Processor
): List[SelfDescribingData[Json]] =
l.map(_.toSDJ(timestamp, processor)).toList

private def mapAndValidateInput[F[_]: Sync](
raw: RawEvent,
enrichedEvent: EnrichedEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
*/
package com.snowplowanalytics.snowplow.enrich.common.enrichments

import java.time.Instant

import org.apache.commons.codec.digest.DigestUtils

import org.specs2.mutable.Specification
Expand Down Expand Up @@ -1273,7 +1275,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE
def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = {
val emailSentSDJ = SelfDescribingData.parse[Json](jparse(emailSent).toOption.get).toOption.get
SpecHelpers.listContexts(enriched.derived_contexts) match {
case List(`emailSentSDJ`, SelfDescribingData(Failure.`failureSchemaKey`, feJson))
case List(SelfDescribingData(Failure.`failureSchemaKey`, feJson), `emailSentSDJ`)
if feJson.field("failureType") == "ValidationError".asJson &&
feJson.field("errors") == Json.arr(
Json.obj(
Expand Down Expand Up @@ -1362,7 +1364,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE
def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = {
val emailSentSDJ = SelfDescribingData.parse[Json](jparse(emailSent).toOption.get).toOption.get
SpecHelpers.listContexts(enriched.derived_contexts) match {
case List(`emailSentSDJ`, SelfDescribingData(Failure.`failureSchemaKey`, feJson))
case List(SelfDescribingData(Failure.`failureSchemaKey`, feJson), `emailSentSDJ`)
if feJson.field("failureType") == "ValidationError".asJson &&
feJson.field("errors") == Json.arr(
Json.obj(
Expand Down Expand Up @@ -1461,7 +1463,7 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE
def expectedDerivedContexts(enriched: EnrichedEvent): Boolean = {
val emailSentSDJ = SelfDescribingData.parse[Json](jparse(emailSent).toOption.get).toOption.get
SpecHelpers.listContexts(enriched.derived_contexts) match {
case List(`emailSentSDJ`, SelfDescribingData(Failure.`failureSchemaKey`, feJson))
case List(SelfDescribingData(Failure.`failureSchemaKey`, feJson), `emailSentSDJ`)
if feJson.field("failureType") == "ValidationError".asJson &&
feJson.field("errors") == Json.arr(
Json.obj(
Expand Down Expand Up @@ -1543,8 +1545,8 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE
def expectedDerivedContexts(enriched: EnrichedEvent): Boolean =
SpecHelpers.listContexts(enriched.derived_contexts) match {
case List(
SelfDescribingData(SchemaKey("nl.basjes", "yauaa_context", "jsonschema", _), _),
SelfDescribingData(Failure.`failureSchemaKey`, feJson)
SelfDescribingData(Failure.`failureSchemaKey`, feJson),
SelfDescribingData(SchemaKey("nl.basjes", "yauaa_context", "jsonschema", _), _)
)
if feJson.field("failureType") == "EnrichmentError: Javascript enrichment".asJson &&
feJson.field("errors") == Json.arr(
Expand Down Expand Up @@ -1704,8 +1706,8 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE
def expectedDerivedContexts(enriched: EnrichedEvent): Boolean =
SpecHelpers.listContexts(enriched.derived_contexts) match {
case List(
SelfDescribingData(SchemaKey("nl.basjes", "yauaa_context", "jsonschema", _), _),
SelfDescribingData(Failure.`failureSchemaKey`, feJson)
SelfDescribingData(Failure.`failureSchemaKey`, feJson),
SelfDescribingData(SchemaKey("nl.basjes", "yauaa_context", "jsonschema", _), _)
)
if feJson.field("failureType") == "ValidationError".asJson &&
feJson.field("errors") == Json.arr(
Expand Down Expand Up @@ -1750,6 +1752,92 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE
case other => ko(s"[$other] is not a SchemaViolations bad row and an enriched event without the faulty enrichment context")
}
}

"return a bad row that contains validation errors only from ue if there is validation error in both ue and derived contexts when emitIncomplete is set to true" >> {
val invalidContext =
"""
{
"schema":"iglu:com.acme/email_sent/jsonschema/1-0-0",
"data": {
"foo": "[email protected]",
"emailAddress2": "[email protected]"
}
}"""
val invalidUe =
"""{
"schema":"iglu:com.snowplowanalytics.snowplow/client_session/jsonschema/1-0-1",
"data": {
"unallowedAdditionalField": "[email protected]"
}
}"""
val script =
s"""
function process(event) {
return [
$invalidContext
];
}"""
val schemaKey = SchemaKey(
"com.snowplowanalytics.snowplow",
"javascript_script_config",
"jsonschema",
SchemaVer.Full(1, 0, 0)
)
val enrichmentReg = EnrichmentRegistry[IO](
javascriptScript = List(
JavascriptScriptEnrichment(schemaKey, script)
)
)
val parameters = Map(
"e" -> "pp",
"tv" -> "js-0.13.1",
"p" -> "web",
"ue_pr" ->
s"""
{
"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0",
"data": $invalidUe
}"""
).toOpt
val rawEvent = RawEvent(api, parameters, None, source, context)
val enriched = EnrichmentManager.enrichEvent[IO](
enrichmentReg,
client,
processor,
timestamp,
rawEvent,
AcceptInvalid.featureFlags,
IO.unit,
SpecHelpers.registryLookup,
atomicFieldLimits,
emitIncomplete = true
)
def expectedDerivedContexts(enriched: EnrichedEvent): Boolean =
SpecHelpers.listContextsSchemas(enriched.derived_contexts).count(_ == Failure.failureSchemaKey) == 2

def expectedBadRow(badRow: BadRow): Boolean =
badRow match {
case BadRow.SchemaViolations(
_,
BadRowFailure.SchemaViolations(
_,
NonEmptyList(FailureDetails.SchemaViolation.IgluError(`clientSessionSchema`, _), Nil)
),
_
) =>
true
case _ => false
}

enriched.value.map {
case Ior.Both(badRow, enriched)
if Option(enriched.unstruct_event).isEmpty &&
expectedDerivedContexts(enriched) &&
expectedBadRow(badRow) =>
ok
case other => ko(s"[$other] is not expected one")
}
}
}

"getCrossDomain" should {
Expand Down Expand Up @@ -2330,6 +2418,50 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers with CatsE
}
}
}

"setDerivedContexts" should {
val sv = Failure.SchemaViolation(
schemaViolation = FailureDetails.SchemaViolation.NotJson("testField", "testValue".some, "testError"),
source = "testSource",
data = Json.obj("testKey" := "testValue")
)
val ef = Failure.EnrichmentFailure(
FailureDetails.EnrichmentFailure(
None,
FailureDetails.EnrichmentFailureMessage.Simple("testError")
)
)
val emailSentSDJ = SelfDescribingData.parse[Json](jparse(emailSent).toOption.get).toOption.get
val timestamp = Instant.now()
"set derived contexts correctly if enrichment result is Ior.Left" >> {
val enriched = new EnrichedEvent()
val enrichmentResult = Ior.Left(NonEmptyList.of(NonEmptyList.of(sv, ef), NonEmptyList.of(sv, ef)))
EnrichmentManager.setDerivedContexts(enriched, enrichmentResult, timestamp, processor)
val schemas = SpecHelpers.listContextsSchemas(enriched.derived_contexts)
schemas.size must beEqualTo(4)
forall(schemas)(s => s must beEqualTo(Failure.failureSchemaKey))
}
"set derived contexts correctly if enrichment result is Ior.Right" >> {
val enriched = new EnrichedEvent()
val enrichmentResult = Ior.Right(List(emailSentSDJ, emailSentSDJ))
EnrichmentManager.setDerivedContexts(enriched, enrichmentResult, timestamp, processor)
val schemas = SpecHelpers.listContextsSchemas(enriched.derived_contexts)
schemas.size must beEqualTo(2)
forall(schemas)(s => s must beEqualTo(emailSentSchema))
}
"set derived contexts correctly if enrichment result is Ior.Both" >> {
val enriched = new EnrichedEvent()
val enrichmentResult = Ior.Both(
NonEmptyList.of(NonEmptyList.of(sv, ef), NonEmptyList.of(sv, ef)),
List(emailSentSDJ, emailSentSDJ)
)
EnrichmentManager.setDerivedContexts(enriched, enrichmentResult, timestamp, processor)
val schemas = SpecHelpers.listContextsSchemas(enriched.derived_contexts)
schemas.size must beEqualTo(6)
schemas.count(_ == Failure.failureSchemaKey) must beEqualTo(4)
schemas.count(_ == emailSentSchema) must beEqualTo(2)
}
}
}

object EnrichmentManagerSpec {
Expand Down

0 comments on commit 23b1237

Please sign in to comment.