Skip to content

Commit

Permalink
Batch transformer: use singleton badrows sink (close #1274)
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Jun 22, 2023
1 parent 3d2874d commit a8f3ae4
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import com.snowplowanalytics.snowplow.rdbloader.common.transformation.Transforme
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.{AtomicFieldsProvider, NonAtomicFieldsProvider}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields
import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.Config.Output.BadSink
import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.badrows.{BadrowSink, GoodOnlyIterator, KinesisSink, WiderowFileSink}
import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.spark.singleton.EventParserSingleton
import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.badrows.GoodOnlyIterator
import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.spark.singleton.{BadrowSinkSingleton, EventParserSingleton}
import io.circe.Json
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -192,50 +192,32 @@ class ShredJob[T](
): RDD[Transformed] =
config.formats match {
case Formats.WideRow.PARQUET =>
val badrowSinkProvider = () => createBadrowsSinkForParquet(config.output.bad, folderName, hadoopConfigBroadcasted)
sinkBad(transformed, badrowSinkProvider, config.output.maxBadBufferSize)
sinkBad(transformed, folderName, hadoopConfigBroadcasted, config.output.maxBadBufferSize)

// For JSON - use custom Kinesis sink when configured.
// Current custom file sink would work, but as opposed to parquet, it's not really necessary, so we can still rely on classic Spark sink.
// For Shred - use custom Kinesis sink when configured.
// Current custom file sink doesn't support directory partitioning required by shredded output, so we have to rely on classic Spark sink.
case Formats.WideRow.JSON | _: Formats.Shred =>
config.output.bad match {
case kinesisConfig: BadSink.Kinesis =>
val badrowSinkProvider = () => KinesisSink.createFrom(kinesisConfig)
sinkBad(transformed, badrowSinkProvider, config.output.maxBadBufferSize)
case _: BadSink.Kinesis =>
sinkBad(transformed, folderName, hadoopConfigBroadcasted, config.output.maxBadBufferSize)
case BadSink.File =>
transformed // do nothing here, use Spark file sink for Json and shredded output format
}
}

// We don't want to use Spark sink for bad data when parquet format is configured, as it requires either:
// - using Spark cache OR
// - processing whole dataset twice
// Both options could affect performance, therefore we sink create custom sinks for both: Kinesis and file outputs.
// Spark is not used to sink bad data produced by parquet transformation.
private def createBadrowsSinkForParquet(
badConfig: Config.Output.BadSink,
folderName: String,
hadoopConfigBroadcasted: Broadcast[SerializableConfiguration]
): BadrowSink =
badConfig match {
case config: BadSink.Kinesis =>
KinesisSink.createFrom(config)
case BadSink.File =>
WiderowFileSink.create(folderName, hadoopConfigBroadcasted.value.value, config.output.path, config.output.compression)
}

private def sinkBad(
transformed: RDD[Transformed],
sinkProvider: () => BadrowSink,
folderName: String,
hadoopConfigBroadcasted: Broadcast[SerializableConfiguration],
badBufferMaxSize: Int
): RDD[Transformed] =
transformed.mapPartitionsWithIndex { case (partitionIndex, partitionData) =>
new GoodOnlyIterator(
partitionData.buffered,
partitionIndex,
sinkProvider(),
BadrowSinkSingleton.get(config, folderName, hadoopConfigBroadcasted),
badBufferMaxSize
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@ import cats.Id
import cats.syntax.either._
import cats.syntax.option._
import cats.syntax.show._

import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup.idLookupInstance
import com.snowplowanalytics.iglu.schemaddl.Properties

import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.eventsmanifest.{EventsManifest, EventsManifestConfig}
import com.snowplowanalytics.snowplow.rdbloader.common._
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.EventUtils.EventParser
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.{EventUtils, PropertiesCache, PropertiesKey}
import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.Config
import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.Config.Output.BadSink
import com.snowplowanalytics.snowplow.rdbloader.transformer.batch.badrows.{BadrowSink, KinesisSink, WiderowFileSink}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.util.SerializableConfiguration

/** Singletons needed for unserializable or stateful classes. */
object singleton {
Expand Down Expand Up @@ -134,4 +136,29 @@ object singleton {
}
}

object BadrowSinkSingleton {
@volatile private var instance: BadrowSink = _

def get(
config: Config,
folderName: String,
hadoopConfigBroadcasted: Broadcast[SerializableConfiguration]
): BadrowSink = {
if (instance == null) {
synchronized {
if (instance == null) {
val sink = config.output.bad match {
case config: BadSink.Kinesis =>
KinesisSink.createFrom(config)
case BadSink.File =>
WiderowFileSink.create(folderName, hadoopConfigBroadcasted.value.value, config.output.path, config.output.compression)
}
instance = sink
}
}
}
instance
}
}

}

0 comments on commit a8f3ae4

Please sign in to comment.