Skip to content

Commit

Permalink
Add possibilty to emit failed events in TSV format into a third stream (
Browse files Browse the repository at this point in the history
#872)

Before this change, any error in the enriching workflow would short circuit
and a failed event would get emitted as JSON. After this change, if incomplete events
are enabled, the enriching goes to the end with what is possible,
accumulating errors as it goes. Errors get attached in derived_contexts
and a failed event gets emitted to a third stream with TSV format
(same as enriched event).
  • Loading branch information
benjben committed Jun 13, 2024
1 parent 52ad0a0 commit e11145d
Show file tree
Hide file tree
Showing 71 changed files with 3,342 additions and 983 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
env:
PGPASSWORD: supersecret1
- name: Run tests
run: SBT_OPTS="-Xms1G -Xmx8G -Xss4M -XX:MaxMetaspaceSize=1024M" sbt coverage +test
run: SBT_OPTS="-Xms1G -Xmx8G -Xss4M -XX:MaxMetaspaceSize=1024M" TESTCONTAINERS_RYUK_DISABLED=true sbt coverage +test
env:
OER_KEY: ${{ secrets.OER_KEY }}
- name: Check Scala formatting
Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ lazy val kinesisDistroless = project
.settings(libraryDependencies ++= kinesisDependencies ++ Seq(
// integration tests dependencies
specs2CEIt,
testContainersIt
testContainersIt,
dockerJavaIt
))
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
Expand Down
8 changes: 8 additions & 0 deletions config/config.file.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
"file": "/var/bad"
"maxBytes": 1000000
}

# Incomplete events output
"incomplete": {
# Local FS supported for testing purposes
"type": "FileSystem"
"file": "/var/incomplete"
"maxBytes": 1000000
}
}

# Optional. Concurrency of the app
Expand Down
19 changes: 19 additions & 0 deletions config/config.kafka.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,25 @@
"acks": "all"
}
}

# Optional. Incomplete events output.
# If set, an incomplete enriched event holding the errors in derived_context will get emitted on top of a bad row
"incomplete": {
"type": "Kafka"

# Name of the Kafka topic to write to
"topicName": "incomplete"

# A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
# This list should be in the form host1:port1,host2:port2,...
"bootstrapServers": "localhost:9092"

# Optional, Kafka producer configuration
# See https://kafka.apache.org/documentation/#producerconfigs for all properties
"producerConf": {
"acks": "all"
}
}
}

# Optional. Concurrency of the app
Expand Down
49 changes: 46 additions & 3 deletions config/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
# Otherwise, the partition key will be a random UUID.
# "partitionKey": "user_id"

# Optional. Policy to retry if writing to kinesis fails with unexepected errors
# Optional. Policy to retry if writing to kinesis fails with unexpected errors
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
Expand Down Expand Up @@ -144,7 +144,7 @@
# Otherwise, the partition key will be a random UUID.
# "partitionKey": "user_id"

# Optional. Policy to retry if writing to kinesis fails with unexepcted errors
# Optional. Policy to retry if writing to kinesis fails with unexpected errors
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
Expand Down Expand Up @@ -186,7 +186,50 @@
# https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
"region": "eu-central-1"

# Optional. Policy to retry if writing to kinesis fails with unexepcted errors
# Optional. Policy to retry if writing to kinesis fails with unexpected errors
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}

# Optional. Policy to retry if writing to kinesis exceeds the provisioned throughput.
"throttledBackoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 1 second
}

# Optional. Limits the number of events in a single PutRecords request.
# Several requests are made in parallel
# Maximum allowed: 500
"recordLimit": 500

# Optional. Limits the number of bytes in a single PutRecords request,
# including records and partition keys.
# Several requests are made in parallel
# Maximum allowed: 5 MB
"byteLimit": 5242880

# Optional. Use a custom Kinesis endpoint.
# Can be used for instance to work locally with localstack
# "customEndpoint": "https://localhost:4566"
}

# Optional. Incomplete events output.
# If set, an incomplete enriched event holding the errors in derived_context will get emitted on top of a bad row
"incomplete": {
"type": "Kinesis"

# Name of the Kinesis stream to write to
"streamName": "incomplete"

# Optional. Region where the Kinesis stream is located
# This field is optional if it can be resolved with AWS region provider chain.
# It checks places like env variables, system properties, AWS profile file.
# https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
"region": "eu-central-1"

# Optional. Policy to retry if writing to kinesis fails with unexpected errors
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
Expand Down
21 changes: 21 additions & 0 deletions config/config.nsq.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,27 @@
"maxRetries": 10
}
}

# Incomplete events output
"incomplete": {
"type": "Nsq"

# Name of the NSQ topic that will receive the incomplete events
"topic": "incomplete"

# The host name of nsqd application
"nsqdHost": "127.0.0.1"

# The port number of nsqd application
"nsqdPort": 4150

# Optional. Policy to retry if writing to NSQ fails
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}
}
}

# Optional. Concurrency of the app
Expand Down
25 changes: 25 additions & 0 deletions config/config.pubsub.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,31 @@
# Note the PubSub maximum is 10MB
"maxBatchBytes": 8000000
}

# Optional. Incomplete events output.
# If set, an incomplete enriched event holding the errors in derived_context will get emitted on top of a bad row
"incomplete": {
"type": "PubSub"

# Name of the PubSub topic that will receive the incomplete events
"topic": "projects/test-project/topics/incomplete"

# Optional. Delay threshold to use for batching.
# After this amount of time has elapsed,
# before maxBatchSize and maxBatchBytes have been reached,
# messages from the buffer will be sent.
"delayThreshold": 200 milliseconds

# Optional. Maximum number of messages sent within a batch.
# When the buffer reaches this number of messages they are sent.
# PubSub maximum : 1000
"maxBatchSize": 1000

# Optional. Maximum number of bytes sent within a batch.
# When the buffer reaches this size messages are sent.
# Note the PubSub maximum is 10MB
"maxBatchBytes": 8000000
}
}

# Optional. Concurrency of the app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import java.util.Base64

import org.joda.time.DateTime

import cats.data.{NonEmptyList, ValidatedNel}
import cats.data.{Ior, NonEmptyList, ValidatedNel}
import cats.{Monad, Parallel}
import cats.implicits._

Expand Down Expand Up @@ -72,7 +72,8 @@ object Enrich {
env.featureFlags,
env.metrics.invalidCount,
env.registryLookup,
env.atomicFields
env.atomicFields,
env.sinkIncomplete.isDefined
)

val enriched =
Expand Down Expand Up @@ -119,7 +120,8 @@ object Enrich {
featureFlags: FeatureFlags,
invalidCount: F[Unit],
registryLookup: RegistryLookup[F],
atomicFields: AtomicFields
atomicFields: AtomicFields,
emitIncomplete: Boolean
)(
row: Array[Byte]
): F[Result] = {
Expand All @@ -140,7 +142,8 @@ object Enrich {
FeatureFlags.toCommon(featureFlags),
invalidCount,
registryLookup,
atomicFields
atomicFields,
emitIncomplete
)
} yield (enriched, collectorTstamp)

Expand Down Expand Up @@ -170,7 +173,7 @@ object Enrich {
case None =>
Sync[F].unit
}
} yield (List(badRow.invalid), collectorTstamp)
} yield (List(Ior.left(badRow)), collectorTstamp)

/** Build a `generic_error` bad row for unhandled runtime errors */
def genericBadRow(
Expand All @@ -189,17 +192,29 @@ object Enrich {
chunk: List[Result],
env: Environment[F, A]
): F[Unit] = {
val (bad, enriched) =
val (bad, enriched, incomplete) =
chunk
.flatMap(_._1)
.map(_.toEither)
.separate
.foldLeft((List.empty[BadRow], List.empty[EnrichedEvent], List.empty[EnrichedEvent])) {
case (previous, item) =>
val (bad, enriched, incomplete) = previous
item match {
case Ior.Right(e) => (bad, e :: enriched, incomplete)
case Ior.Left(br) => (br :: bad, enriched, incomplete)
case Ior.Both(br, i) => (br :: bad, enriched, i :: incomplete)
}
}

val (moreBad, good) = enriched.map { e =>
serializeEnriched(e, env.processor, env.streamsSettings.maxRecordSize)
.map(bytes => (e, AttributedData(bytes, env.goodPartitionKey(e), env.goodAttributes(e))))
}.separate

val (incompleteTooBig, incompleteBytes) = incomplete.map { e =>
serializeEnriched(e, env.processor, env.streamsSettings.maxRecordSize)
.map(bytes => AttributedData(bytes, env.goodPartitionKey(e), env.goodAttributes(e)))
}.separate

val allBad = (bad ++ moreBad).map(badRowResize(env, _))

List(
Expand All @@ -214,7 +229,10 @@ object Enrich {
env.processor,
env.streamsSettings.maxRecordSize
) *> env.metrics.enrichLatency(chunk.headOption.flatMap(_._2)),
sinkBad(allBad, env.sinkBad, env.metrics.badCount)
sinkBad(allBad, env.sinkBad, env.metrics.badCount),
if (incompleteTooBig.nonEmpty) Logger[F].warn(s"${incompleteTooBig.size} incomplete events discarded because they are too big")
else Sync[F].unit,
sinkIncomplete(incompleteBytes, env.sinkIncomplete, env.metrics.incompleteCount)
).parSequence_
}

Expand Down Expand Up @@ -272,6 +290,16 @@ object Enrich {
Sync[F].unit
}

def sinkIncomplete[F[_]: Sync](
incomplete: List[AttributedData[Array[Byte]]],
maybeSink: Option[AttributedByteSink[F]],
incMetrics: Int => F[Unit]
): F[Unit] =
maybeSink match {
case Some(sink) => sink(incomplete) *> incMetrics(incomplete.size)
case None => Sync[F].unit
}

def serializeEnriched(
enriched: EnrichedEvent,
processor: Processor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.io.experimental.Metadata
* @param sinkGood function that sinks enriched event
* @param sinkPii function that sinks pii event
* @param sinkBad function that sinks an event that failed validation or enrichment
* @param sinkIncomplete function that sinks incomplete events
* @param checkpoint function that checkpoints input stream records
* @param getPayload function that extracts the collector payload bytes from a record
* @param sentry optional sentry client
Expand Down Expand Up @@ -111,6 +112,7 @@ final case class Environment[F[_], A](
sinkGood: AttributedByteSink[F],
sinkPii: Option[AttributedByteSink[F]],
sinkBad: ByteSink[F],
sinkIncomplete: Option[AttributedByteSink[F]],
checkpoint: List[A] => F[Unit],
getPayload: A => Array[Byte],
sentry: Option[SentryClient],
Expand Down Expand Up @@ -187,6 +189,7 @@ object Environment {
sinkGood: Resource[F, AttributedByteSink[F]],
sinkPii: Option[Resource[F, AttributedByteSink[F]]],
sinkBad: Resource[F, ByteSink[F]],
sinkIncomplete: Option[Resource[F, AttributedByteSink[F]]],
clients: Resource[F, List[Client[F]]],
checkpoint: List[A] => F[Unit],
getPayload: A => Array[Byte],
Expand All @@ -204,11 +207,12 @@ object Environment {
good <- sinkGood
bad <- sinkBad
pii <- sinkPii.sequence
incomplete <- sinkIncomplete.sequence
http4s <- Clients.mkHttp()
clts <- clients.map(Clients.init(http4s, _))
igluClient <- IgluCirceClient.parseDefault[F](parsedConfigs.igluJson).resource
remoteAdaptersEnabled = file.remoteAdapters.configs.nonEmpty
metrics <- Resource.eval(Metrics.build[F](file.monitoring.metrics, remoteAdaptersEnabled))
metrics <- Resource.eval(Metrics.build[F](file.monitoring.metrics, remoteAdaptersEnabled, incomplete.isDefined))
metadata <- Resource.eval(metadataReporter[F](file, processor.artifact, http4s))
assets = parsedConfigs.enrichmentConfigs.flatMap(_.filesToCache)
remoteAdapters <- prepareRemoteAdapters[F](file.remoteAdapters, metrics)
Expand All @@ -231,6 +235,7 @@ object Environment {
good,
pii,
bad,
incomplete,
checkpoint,
getPayload,
sentry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object Run {
mkSinkGood: Output => Resource[F, AttributedByteSink[F]],
mkSinkPii: Output => Resource[F, AttributedByteSink[F]],
mkSinkBad: Output => Resource[F, ByteSink[F]],
mkSinkIncomplete: Output => Resource[F, AttributedByteSink[F]],
checkpoint: List[A] => F[Unit],
mkClients: BlobStorageClients => List[Resource[F, Client[F]]],
getPayload: A => Array[Byte],
Expand Down Expand Up @@ -89,6 +90,7 @@ object Run {
case _ =>
mkSinkBad(file.output.bad)
}
sinkIncomplete = file.output.incomplete.map(out => initAttributedSink(out, mkSinkIncomplete))
clients = mkClients(file.blobStorage).sequence
exit <- file.input match {
case p: Input.FileSystem =>
Expand All @@ -100,6 +102,7 @@ object Run {
sinkGood,
sinkPii,
sinkBad,
sinkIncomplete,
clients,
_ => Sync[F].unit,
identity,
Expand Down Expand Up @@ -130,6 +133,7 @@ object Run {
sinkGood,
sinkPii,
sinkBad,
sinkIncomplete,
clients,
checkpointing,
getPayload,
Expand Down
Loading

0 comments on commit e11145d

Please sign in to comment.