Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Mar 28, 2024
1 parent 0ee0621 commit 39fa968
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.{IgluUtils, Conversion

object EnrichmentManager {

private type EnrichmentResult[F[_]] = IorT[F, NonEmptyList[FailureEntity.BadRowWithFailureEntities], (EnrichedEvent, List[SelfDescribingData[Json]])]
private type EnrichmentResult[F[_]] =
IorT[F, NonEmptyList[FailureEntity.BadRowWithFailureEntities], (EnrichedEvent, List[SelfDescribingData[Json]])]

/**
* Run the enrichment workflow
Expand Down Expand Up @@ -103,18 +104,18 @@ object EnrichmentManager {
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
validContexts <- validateEnriched(
enriched,
raw,
enrichmentsContexts,
client,
processor,
registryLookup,
featureFlags.acceptInvalid,
invalidCount,
atomicFields
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
enriched,
raw,
enrichmentsContexts,
client,
processor,
registryLookup,
featureFlags.acceptInvalid,
invalidCount,
atomicFields
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
} yield (enriched, validContexts ::: extractResult.validationInfoContexts)

// derived contexts are set lastly because we want to include failure entities
Expand All @@ -125,18 +126,20 @@ object EnrichmentManager {

private def setDerivedContexts[F[_]: Sync](enriched: EnrichmentResult[F]): EnrichmentResult[F] =
IorT(
enriched.value.flatTap(v => Sync[F].delay {
val (derivedContexts, enriched) = v match {
case Ior.Right((e, l)) => (l, e.some)
case Ior.Left(l) => (extractFailureEntities(l), None)
case Ior.Both(b, (e, l)) => (l ::: extractFailureEntities(b), e.some)
enriched.value.flatTap(v =>
Sync[F].delay {
val (derivedContexts, enriched) = v match {
case Ior.Right((e, l)) => (l, e.some)
case Ior.Left(l) => (extractFailureEntities(l), None)
case Ior.Both(b, (e, l)) => (l ::: extractFailureEntities(b), e.some)
}
for {
c <- ME.formatContexts(derivedContexts)
e <- enriched
_ = e.derived_contexts = c
} yield ()
}
for {
c <- ME.formatContexts(derivedContexts)
e <- enriched
_ = e.derived_contexts = c
} yield ()
})
)
)

private def extractFailureEntities(l: NonEmptyList[FailureEntity.BadRowWithFailureEntities]): List[SelfDescribingData[Json]] =
Expand All @@ -157,12 +160,12 @@ object EnrichmentManager {
} yield extract

iorT.leftMap { violations =>
buildSchemaViolationsBadRow(
violations,
EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent),
RawEvent.toRawEvent(raw),
processor
)
buildSchemaViolationsBadRow(
violations,
EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent),
RawEvent.toRawEvent(raw),
processor
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,9 @@ object FailureEntity {
case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ResolutionError(lh)) =>
val message = s"Resolution error: schema ${schemaKey.toSchemaUri} not found"
val lookupHistory = lh.toList
.map { case (repo, lookups) =>
lookups.asJson.deepMerge(Json.obj("repository" := repo.asJson))
.map {
case (repo, lookups) =>
lookups.asJson.deepMerge(Json.obj("repository" := repo.asJson))
}
FailureEntity(
failureType = "ResolutionError",
Expand All @@ -176,9 +177,7 @@ object FailureEntity {
componentName = processor.artifact,
componentVersion = processor.version
)
case FailureDetails.SchemaViolation.IgluError(
schemaKey,
ClientError.ValidationError(ValidatorError.InvalidData(e), _)) =>
case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidData(e), _)) =>
val errors = e.toList.map { r =>
Json.obj(
"message" := r.message,
Expand All @@ -197,9 +196,7 @@ object FailureEntity {
componentName = processor.artifact,
componentVersion = processor.version
)
case FailureDetails.SchemaViolation.IgluError(
schemaKey,
ClientError.ValidationError(ValidatorError.InvalidSchema(e), _)) =>
case FailureDetails.SchemaViolation.IgluError(schemaKey, ClientError.ValidationError(ValidatorError.InvalidSchema(e), _)) =>
val errors = e.toList.map { r =>
Json.obj(
"message" := s"Invalid schema: $schemaKey - ${r.message}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect
.extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup)
.value
.map {
case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ValidationError), _, _), _), None) => ok
case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ValidationError), _, _), _), None) =>
ok
case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(ie: SchemaViolation.IgluError, _, _), _), None) =>
ko(s"IgluError [$ie] is not ValidationError")
case other => ko(s"[$other] is not an error with IgluError")
Expand All @@ -219,7 +220,8 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect
.extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup)
.value
.map {
case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ValidationError), _, _), _), None) => ok
case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ValidationError), _, _), _), None) =>
ok
case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(ie: SchemaViolation.IgluError, _, _), _), None) =>
ko(s"IgluError [$ie] is not ValidationError")
case other => ko(s"[$other] is not an error with IgluError")
Expand All @@ -234,7 +236,8 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect
.extractAndValidateUnstructEvent(input, SpecHelpers.client, SpecHelpers.registryLookup)
.value
.map {
case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ResolutionError), _, _), _), None) => ok
case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ResolutionError), _, _), _), None) =>
ok
case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(ie: SchemaViolation.IgluError, _, _), _), None) =>
ko(s"IgluError [$ie] is not a ResolutionError")
case other => ko(s"[$other] is not an error with IgluError")
Expand Down Expand Up @@ -488,7 +491,8 @@ class IgluUtilsSpec extends Specification with ValidatedMatchers with CatsEffect
.validateEnrichmentsContexts(SpecHelpers.client, contexts, SpecHelpers.registryLookup)
.value
.map {
case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ValidationError), _, _), Nil), Nil) => ok
case Ior.Both(NonEmptyList(SchemaViolationWithExtraContext(SchemaViolation.IgluError(_, _: ValidationError), _, _), Nil), Nil) =>
ok
case other => ko(s"[$other] is not one ValidationError")
}
}
Expand Down

0 comments on commit 39fa968

Please sign in to comment.