Skip to content

Commit

Permalink
Databricks loader: Performance degradation for long running applicati…
Browse files Browse the repository at this point in the history
…on -fixes (close #1029)
  • Loading branch information
voropaevp authored and spenes committed Nov 16, 2022
1 parent 0cd1211 commit 1756e68
Show file tree
Hide file tree
Showing 17 changed files with 208 additions and 18 deletions.
8 changes: 7 additions & 1 deletion config/loader/aws/databricks.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
# User agent name for Databricks connection. Optional, default value "snowplow-rdbloader-oss"
"userAgent": "snowplow-rdbloader-oss"

# Optimize period per table, that will be used as predicate for the OPTIMIZE command.
"eventsOptimizePeriod": "2 days",

# Optional, default method is 'NoCreds'
# Specifies the auth method to use with 'COPY INTO' statement.
"loadAuthMethod": {
Expand Down Expand Up @@ -70,7 +73,10 @@
# For how long the loader should be paused
"duration": "1 hour"
}
]
],
# Loader runs periodic OPTIMIZE statements to prevent growing number of files behind delta tables.
"optimizeEvents": "0 0 0 ? * *",
"optimizeManifest": "0 0 5 ? * *"
}

# Observability and reporting options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
"storage": {
"type": "databricks"
"userAgent": "snowplow-rdbloader-oss"
"eventsOptimizePeriod": "2 days",
"loadAuthMethod": {
"type": "NoCreds"
"type": "NoCreds",
"roleSessionName": "rdb_loader"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ object Databricks {
throw new IllegalStateException("Databricks Loader does not use EventsCopyToTempTable statement")
case _: Statement.EventsCopyFromTempTable =>
throw new IllegalStateException("Databricks Loader does not use EventsCopyFromTempTable statement")
case Statement.VacuumEvents => sql"""
OPTIMIZE ${Fragment.const0(qualify(EventsTable.MainName))}
WHERE collector_tstamp_date >= current_timestamp() - INTERVAL ${tgt.eventsOptimizePeriod.toSeconds} second"""
case Statement.VacuumManifest => sql"""
OPTIMIZE ${Fragment.const0(qualify(Manifest.Name))}
ZORDER BY base"""
}

private def qualify(tableName: String): String = tgt.catalog match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ object DatabricksSpec {
StorageTarget.PasswordConfig.PlainText("xxx"),
None,
"useragent",
StorageTarget.LoadAuthMethod.NoCreds
StorageTarget.LoadAuthMethod.NoCreds,
2.days
),
Config.Cloud.AWS(
Region("eu-central-1"),
Expand Down
4 changes: 3 additions & 1 deletion modules/loader/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
}
},
"schedules": {
"noOperation": []
"noOperation": [],
"optimizeEvents": "0 0 0 ? * *",
"optimizeManifest": "0 0 5 ? * *"
},
"retries": {
"backoff": "30 seconds",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package com.snowplowanalytics.snowplow.rdbloader


import scala.concurrent.duration._

import cats.{Applicative, Apply, Monad}
Expand All @@ -28,7 +29,7 @@ import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget}
import com.snowplowanalytics.snowplow.rdbloader.db.Columns._
import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, HealthCheck, Manifest, Statement, Control => DbControl}
import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, NoOperation, Retries}
import com.snowplowanalytics.snowplow.rdbloader.dsl.{Cache, DAO, FolderMonitoring, Iglu, Logging, Monitoring, StateMonitoring, Transaction}
import com.snowplowanalytics.snowplow.rdbloader.dsl.{Cache, DAO, FolderMonitoring, Iglu, Logging, Monitoring, StateMonitoring, Transaction, VacuumScheduling}
import com.snowplowanalytics.snowplow.rdbloader.dsl.Monitoring.AlertPayload
import com.snowplowanalytics.snowplow.rdbloader.loading.{EventsTable, Load, Stage, TargetCheck, Retry}
import com.snowplowanalytics.snowplow.rdbloader.loading.Retry._
Expand All @@ -42,7 +43,7 @@ object Loader {

/** How often Loader should print its internal state */
private val StateLoggingFrequency: FiniteDuration = 5.minutes

/** Restrict the length of an alert message to be compliant with alert iglu schema */
private val MaxAlertPayloadLength = 4096

Expand All @@ -64,6 +65,9 @@ object Loader {
FolderMonitoring.run[F, C](config.monitoring.folders, config.readyCheck, config.storage, control.isBusy)
val noOpScheduling: Stream[F, Unit] =
NoOperation.run(config.schedules.noOperation, control.makePaused, control.signal.map(_.loading))

val vacuumScheduling: Stream[F, Unit] =
VacuumScheduling.run[F, C](config.storage, config.schedules)
val healthCheck =
HealthCheck.start[F, C](config.monitoring.healthCheck)
val loading: Stream[F, Unit] =
Expand Down Expand Up @@ -96,6 +100,7 @@ object Loader {
.merge(stateLogging)
.merge(periodicMetrics)
.merge(telemetry.report)
.merge(vacuumScheduling)
}

process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
import cats.effect.Sync
import cats.data.EitherT
import cats.syntax.either._
import cats.syntax.option._

import io.circe._
import io.circe.generic.semiauto._
Expand Down Expand Up @@ -66,7 +67,9 @@ object Config {
}

final case class Schedule(name: String, when: CronExpr, duration: FiniteDuration)
final case class Schedules(noOperation: List[Schedule])
final case class Schedules(noOperation: List[Schedule],
optimizeEvents: Option[CronExpr] = None,
optimizeManifest: Option[CronExpr] = None)
final case class Monitoring(snowplow: Option[SnowplowMonitoring], sentry: Option[Sentry], metrics: Metrics, webhook: Option[Webhook], folders: Option[Folders], healthCheck: Option[HealthCheck])
final case class SnowplowMonitoring(appId: String, collector: String)
final case class Sentry(dsn: URI)
Expand Down Expand Up @@ -132,6 +135,16 @@ object Config {
implicit val implRegionConfigDecoder: Decoder[Region] =
regionConfigDecoder

implicit val nullableCronExprDecoder: Decoder[Option[CronExpr]] = Decoder.instance { cur =>
cur.as[String] match {
case Left(other) => Left(other)
case Right(cred) => cred match {
case "" => Right(None)
case _ => cur.as[CronExpr].map(_.some)
}
}
}

implicit val snowplowMonitoringDecoder: Decoder[SnowplowMonitoring] =
deriveDecoder[SnowplowMonitoring]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import java.util.Properties

import cats.data._
import cats.implicits._
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage

import io.circe._
import io.circe.Decoder._
Expand All @@ -26,7 +25,9 @@ import doobie.free.connection.setAutoCommit
import doobie.util.transactor.Strategy

import com.snowplowanalytics.snowplow.rdbloader.common.config.StringEnum
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage

import scala.concurrent.duration.{Duration, FiniteDuration}

/**
* Common configuration for JDBC target, such as Redshift
Expand Down Expand Up @@ -110,8 +111,9 @@ object StorageTarget {
password: PasswordConfig,
sshTunnel: Option[TunnelConfig],
userAgent: String,
loadAuthMethod: LoadAuthMethod
) extends StorageTarget {
loadAuthMethod: LoadAuthMethod,
eventsOptimizePeriod: FiniteDuration
) extends StorageTarget {

override def username: String = "token"

Expand Down Expand Up @@ -325,6 +327,17 @@ object StorageTarget {
implicit def redshiftConfigDecoder: Decoder[Redshift] =
deriveDecoder[Redshift]

implicit val durationDecoder: Decoder[FiniteDuration] =
Decoder[String].emap { str =>
Either
.catchOnly[NumberFormatException](Duration.create(str))
.leftMap(_.toString)
.flatMap { duration =>
if (duration.isFinite) Right(duration.asInstanceOf[FiniteDuration])
else Left(s"Cannot convert Duration $duration to FiniteDuration")
}
}

implicit def databricksConfigDecoder: Decoder[Databricks] =
deriveDecoder[Databricks]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,8 @@ object Statement {
case class CreateTable(ddl: Fragment) extends Statement
case class AlterTable(ddl: Fragment) extends Statement
case class DdlFile(ddl: Fragment) extends Statement

// Optimize (housekeeping i.e. vacuum in redshift, optimize in databricks)
case object VacuumManifest extends Statement
case object VacuumEvents extends Statement
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.snowplowanalytics.snowplow.rdbloader.dsl

import fs2.Stream
import retry._
import retry.syntax.all._
import cats.syntax.all._
import cats.effect.{Concurrent, MonadThrow, Timer}
import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget}
import com.snowplowanalytics.snowplow.rdbloader.db.Statement
import eu.timepit.fs2cron.ScheduledStreams
import eu.timepit.fs2cron.cron4s.Cron4sScheduler
import retry.RetryDetails.{GivingUp, WillDelayAndRetry}

import scala.concurrent.duration._

object VacuumScheduling {

def retryPolicy[F[_]: Timer: Concurrent]: RetryPolicy[F] =
RetryPolicies.fibonacciBackoff[F](1.minute) join RetryPolicies.limitRetries[F](10)

def logError[F[_]: Logging](err: Throwable, details: RetryDetails): F[Unit] = details match {

case WillDelayAndRetry(
nextDelay: FiniteDuration,
retriesSoFar: Int,
cumulativeDelay: FiniteDuration) => Logging[F].warning(
s"Failed to vacuum with ${err.getMessage}. So far we have retried $retriesSoFar times over for $cumulativeDelay. Next attempt in $nextDelay.")


case GivingUp(totalRetries: Int, totalDelay: FiniteDuration) =>
Logging[F].error(
s"Failed to vacuum with ${err.getMessage}. Giving up after $totalRetries retries after $totalDelay."
)
}

def run[F[_]: Transaction[*[_], C]: Concurrent: Logging: Timer, C[_]: DAO: MonadThrow: Logging](tgt: StorageTarget,cfg: Config.Schedules): Stream[F, Unit] = {
val vacuumEvents: Stream[F, Unit] = tgt match {
case _: StorageTarget.Databricks => cfg.optimizeEvents match {
case Some(cron) =>
new ScheduledStreams(Cron4sScheduler.systemDefault[F])
.awakeEvery(cron)
.evalMap { _ =>
Transaction[F, C].transact(
Logging[C].info("initiating events vacuum") *> DAO[C].executeQuery(Statement.VacuumEvents) *> Logging[C].info("vacuum events complete")
).retryingOnAllErrors(retryPolicy[F], logError[F]).orElse(().pure[F])
}
case _ => Stream.empty[F]
}
case _ => Stream.empty[F]
}

val vacuumManifest: Stream[F, Unit] = tgt match {
case _: StorageTarget.Databricks => cfg.optimizeManifest match {
case Some(cron) =>
new ScheduledStreams(Cron4sScheduler.systemDefault[F])
.awakeEvery(cron)
.evalMap { _ =>
Transaction[F, C].transact(
Logging[C].info("initiating manifest vacuum") *> DAO[C].executeQuery(Statement.VacuumManifest) *> Logging[C].info("vacuum manifest complete")
).retryingOnAllErrors(retryPolicy[F], logError[F]).orElse(().pure[F])
}
case _ => Stream.empty[F]
}
case _ => Stream.empty[F]
}

vacuumEvents merge vacuumManifest
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,16 @@ object ConfigSpec {
)
val exampleSchedules: Config.Schedules = Config.Schedules(List(
Config.Schedule("Maintenance window", Cron.unsafeParse("0 0 12 * * ?"), 1.hour)
))
),
Some(Cron.unsafeParse("0 0 0 ? * *")),
Some(Cron.unsafeParse("0 0 5 ? * *"))
)
val exampleRetryQueue: Option[Config.RetryQueue] = Some(Config.RetryQueue(
30.minutes, 64, 3, 5.seconds
))
val emptySchedules: Config.Schedules = Config.Schedules(Nil)
val defaultSchedules: Config.Schedules = Config.Schedules(Nil,
Some(Cron.unsafeParse("0 0 0 ? * *")),
Some(Cron.unsafeParse("0 0 5 ? * *")))
val exampleTimeouts: Config.Timeouts = Config.Timeouts(1.hour, 10.minutes, 5.minutes)
val exampleRetries: Config.Retries = Config.Retries(Config.Strategy.Exponential, Some(3), 30.seconds, Some(1.hour))
val exampleReadyCheck: Config.Retries = Config.Retries(Config.Strategy.Constant, None, 15.seconds, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.config

import io.circe.{DecodingFailure, CursorOp}
import cron4s.Cron
import io.circe.{CursorOp, DecodingFailure}
import io.circe.literal._

import cats.syntax.all._
import org.specs2.mutable.Specification

import scala.concurrent.duration.DurationInt

class StorageTargetSpec extends Specification {
"StorageTarget" should {
"be parsed from valid JSON" in {
Expand Down Expand Up @@ -72,6 +75,26 @@ class StorageTargetSpec extends Specification {
}
}

"Schedules config" should {
"be parsed from valid JSON" in {
val input = json"""{
"noOperation": [],
"optimizeEvents": "",
"optimizeManifest": "*/3 * * ? * *"
}"""
val impl = Config.implicits()
import impl._

val expected: Config.Schedules = Config.Schedules(
noOperation = List.empty[Config.Schedule],
optimizeEvents = None,
optimizeManifest = Cron.unsafeParse("*/3 * * ? * *").some
)

input.as[Config.Schedules] must beRight(expected)
}
}

"TunnelConfig" should {
"be parsed from valid JSON" in {
val sshTunnel = json"""{
Expand Down Expand Up @@ -126,6 +149,39 @@ class StorageTargetSpec extends Specification {
}
}

"Databricks " should {
"parse full config correctly" in {
val input = json"""{
"host": "databricks.com",
"schema": "snowplow",
"port": 443,
"httpPath": "http/path",
"password": "Supersecret1",
"userAgent": "snowplow-rdbloader-oss",
"eventsOptimizePeriod": "2 days",
"loadAuthMethod": {
"type": "NoCreds",
"roleSessionName": "rdb_loader"
}
}"""

val expected: StorageTarget.Databricks = StorageTarget.Databricks(
host = "databricks.com",
catalog = None,
schema = "snowplow",
port = 443,
httpPath = "http/path",
password = StorageTarget.PasswordConfig.PlainText("Supersecret1"),
sshTunnel = None,
userAgent = "snowplow-rdbloader-oss",
loadAuthMethod = StorageTarget.LoadAuthMethod.NoCreds,
eventsOptimizePeriod = 2.days
)

input.as[StorageTarget.Databricks] must beRight(expected)
}
}

"RedshiftJdbc" should {
"be parsed from valid JSON" in {
val input = json"""{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ object FolderMonitoringSpec {
StorageTarget.PasswordConfig.PlainText("Supersecret1"),
None,
"snowplow-rdbloader-oss",
StorageTarget.LoadAuthMethod.NoCreds
StorageTarget.LoadAuthMethod.NoCreds,
2.days
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ object Redshift {
throw new IllegalStateException("Redshift Loader does not use EventsCopyToTempTable statement")
case _: Statement.EventsCopyFromTempTable =>
throw new IllegalStateException("Redshift Loader does not use EventsCopyFromTempTable statement")
case Statement.VacuumEvents =>
throw new IllegalStateException("Redshift Loader does not use vacuum events table statement")
case Statement.VacuumManifest =>
throw new IllegalStateException("Redshift Loader does not use vacuum manifest statement")
}

private def qualify(tableName: String): String =
Expand Down
Loading

0 comments on commit 1756e68

Please sign in to comment.