From 1756e68c8a2df3d61e9a2a43f3f56c452a85cbd8 Mon Sep 17 00:00:00 2001 From: "pavel.voropaev" Date: Wed, 24 Aug 2022 12:28:37 +0100 Subject: [PATCH] Databricks loader: Performance degradation for long running application -fixes (close #1029) --- .../aws/databricks.config.reference.hocon | 8 ++- .../src/main/resources/application.conf | 3 +- .../loader/databricks/Databricks.scala | 6 ++ .../loader/databricks/DatabricksSpec.scala | 3 +- .../src/main/resources/application.conf | 4 +- .../snowplow/rdbloader/Loader.scala | 9 ++- .../snowplow/rdbloader/config/Config.scala | 15 +++- .../rdbloader/config/StorageTarget.scala | 19 ++++- .../snowplow/rdbloader/db/Statement.scala | 4 ++ .../rdbloader/dsl/VacuumScheduling.scala | 69 +++++++++++++++++++ .../snowplow/rdbloader/ConfigSpec.scala | 9 ++- .../rdbloader/config/StorageTargetSpec.scala | 60 +++++++++++++++- .../rdbloader/dsl/FolderMonitoringSpec.scala | 3 +- .../snowplow/loader/redshift/Redshift.scala | 4 ++ .../snowplow/loader/redshift/ConfigSpec.scala | 2 +- .../snowplow/loader/snowflake/Snowflake.scala | 4 ++ .../loader/snowflake/ConfigSpec.scala | 4 +- 17 files changed, 208 insertions(+), 18 deletions(-) create mode 100644 modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/VacuumScheduling.scala diff --git a/config/loader/aws/databricks.config.reference.hocon b/config/loader/aws/databricks.config.reference.hocon index 31aa49370..1f487268e 100644 --- a/config/loader/aws/databricks.config.reference.hocon +++ b/config/loader/aws/databricks.config.reference.hocon @@ -33,6 +33,9 @@ # User agent name for Databricks connection. Optional, default value "snowplow-rdbloader-oss" "userAgent": "snowplow-rdbloader-oss" + # Optimize period per table, that will be used as predicate for the OPTIMIZE command. + "eventsOptimizePeriod": "2 days", + # Optional, default method is 'NoCreds' # Specifies the auth method to use with 'COPY INTO' statement. "loadAuthMethod": { @@ -70,7 +73,10 @@ # For how long the loader should be paused "duration": "1 hour" } - ] + ], + # Loader runs periodic OPTIMIZE statements to prevent growing number of files behind delta tables. + "optimizeEvents": "0 0 0 ? * *", + "optimizeManifest": "0 0 5 ? * *" } # Observability and reporting options diff --git a/modules/databricks-loader/src/main/resources/application.conf b/modules/databricks-loader/src/main/resources/application.conf index 1d5654306..4839c37f9 100644 --- a/modules/databricks-loader/src/main/resources/application.conf +++ b/modules/databricks-loader/src/main/resources/application.conf @@ -2,8 +2,9 @@ "storage": { "type": "databricks" "userAgent": "snowplow-rdbloader-oss" + "eventsOptimizePeriod": "2 days", "loadAuthMethod": { - "type": "NoCreds" + "type": "NoCreds", "roleSessionName": "rdb_loader" } } diff --git a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala index 6105ccf72..bf7f2d4cb 100644 --- a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala +++ b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala @@ -171,6 +171,12 @@ object Databricks { throw new IllegalStateException("Databricks Loader does not use EventsCopyToTempTable statement") case _: Statement.EventsCopyFromTempTable => throw new IllegalStateException("Databricks Loader does not use EventsCopyFromTempTable statement") + case Statement.VacuumEvents => sql""" + OPTIMIZE ${Fragment.const0(qualify(EventsTable.MainName))} + WHERE collector_tstamp_date >= current_timestamp() - INTERVAL ${tgt.eventsOptimizePeriod.toSeconds} second""" + case Statement.VacuumManifest => sql""" + OPTIMIZE ${Fragment.const0(qualify(Manifest.Name))} + ZORDER BY base""" } private def qualify(tableName: String): String = tgt.catalog match { diff --git a/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala index 7a7700096..957f09979 100644 --- a/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala +++ b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala @@ -131,7 +131,8 @@ object DatabricksSpec { StorageTarget.PasswordConfig.PlainText("xxx"), None, "useragent", - StorageTarget.LoadAuthMethod.NoCreds + StorageTarget.LoadAuthMethod.NoCreds, + 2.days ), Config.Cloud.AWS( Region("eu-central-1"), diff --git a/modules/loader/src/main/resources/application.conf b/modules/loader/src/main/resources/application.conf index 61195f3dc..e4e998928 100644 --- a/modules/loader/src/main/resources/application.conf +++ b/modules/loader/src/main/resources/application.conf @@ -11,7 +11,9 @@ } }, "schedules": { - "noOperation": [] + "noOperation": [], + "optimizeEvents": "0 0 0 ? * *", + "optimizeManifest": "0 0 5 ? * *" }, "retries": { "backoff": "30 seconds", diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala index f939e27d3..ede522f8b 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala @@ -12,6 +12,7 @@ */ package com.snowplowanalytics.snowplow.rdbloader + import scala.concurrent.duration._ import cats.{Applicative, Apply, Monad} @@ -28,7 +29,7 @@ import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} import com.snowplowanalytics.snowplow.rdbloader.db.Columns._ import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, HealthCheck, Manifest, Statement, Control => DbControl} import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, NoOperation, Retries} -import com.snowplowanalytics.snowplow.rdbloader.dsl.{Cache, DAO, FolderMonitoring, Iglu, Logging, Monitoring, StateMonitoring, Transaction} +import com.snowplowanalytics.snowplow.rdbloader.dsl.{Cache, DAO, FolderMonitoring, Iglu, Logging, Monitoring, StateMonitoring, Transaction, VacuumScheduling} import com.snowplowanalytics.snowplow.rdbloader.dsl.Monitoring.AlertPayload import com.snowplowanalytics.snowplow.rdbloader.loading.{EventsTable, Load, Stage, TargetCheck, Retry} import com.snowplowanalytics.snowplow.rdbloader.loading.Retry._ @@ -42,7 +43,7 @@ object Loader { /** How often Loader should print its internal state */ private val StateLoggingFrequency: FiniteDuration = 5.minutes - + /** Restrict the length of an alert message to be compliant with alert iglu schema */ private val MaxAlertPayloadLength = 4096 @@ -64,6 +65,9 @@ object Loader { FolderMonitoring.run[F, C](config.monitoring.folders, config.readyCheck, config.storage, control.isBusy) val noOpScheduling: Stream[F, Unit] = NoOperation.run(config.schedules.noOperation, control.makePaused, control.signal.map(_.loading)) + + val vacuumScheduling: Stream[F, Unit] = + VacuumScheduling.run[F, C](config.storage, config.schedules) val healthCheck = HealthCheck.start[F, C](config.monitoring.healthCheck) val loading: Stream[F, Unit] = @@ -96,6 +100,7 @@ object Loader { .merge(stateLogging) .merge(periodicMetrics) .merge(telemetry.report) + .merge(vacuumScheduling) } process diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala index 26116b03c..ac4a981d7 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala @@ -19,6 +19,7 @@ import scala.concurrent.duration.{Duration, FiniteDuration} import cats.effect.Sync import cats.data.EitherT import cats.syntax.either._ +import cats.syntax.option._ import io.circe._ import io.circe.generic.semiauto._ @@ -66,7 +67,9 @@ object Config { } final case class Schedule(name: String, when: CronExpr, duration: FiniteDuration) - final case class Schedules(noOperation: List[Schedule]) + final case class Schedules(noOperation: List[Schedule], + optimizeEvents: Option[CronExpr] = None, + optimizeManifest: Option[CronExpr] = None) final case class Monitoring(snowplow: Option[SnowplowMonitoring], sentry: Option[Sentry], metrics: Metrics, webhook: Option[Webhook], folders: Option[Folders], healthCheck: Option[HealthCheck]) final case class SnowplowMonitoring(appId: String, collector: String) final case class Sentry(dsn: URI) @@ -132,6 +135,16 @@ object Config { implicit val implRegionConfigDecoder: Decoder[Region] = regionConfigDecoder + implicit val nullableCronExprDecoder: Decoder[Option[CronExpr]] = Decoder.instance { cur => + cur.as[String] match { + case Left(other) => Left(other) + case Right(cred) => cred match { + case "" => Right(None) + case _ => cur.as[CronExpr].map(_.some) + } + } + } + implicit val snowplowMonitoringDecoder: Decoder[SnowplowMonitoring] = deriveDecoder[SnowplowMonitoring] diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala index 855e0dab6..82a2f39de 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala @@ -16,7 +16,6 @@ import java.util.Properties import cats.data._ import cats.implicits._ -import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage import io.circe._ import io.circe.Decoder._ @@ -26,7 +25,9 @@ import doobie.free.connection.setAutoCommit import doobie.util.transactor.Strategy import com.snowplowanalytics.snowplow.rdbloader.common.config.StringEnum +import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage +import scala.concurrent.duration.{Duration, FiniteDuration} /** * Common configuration for JDBC target, such as Redshift @@ -110,8 +111,9 @@ object StorageTarget { password: PasswordConfig, sshTunnel: Option[TunnelConfig], userAgent: String, - loadAuthMethod: LoadAuthMethod - ) extends StorageTarget { + loadAuthMethod: LoadAuthMethod, + eventsOptimizePeriod: FiniteDuration + ) extends StorageTarget { override def username: String = "token" @@ -325,6 +327,17 @@ object StorageTarget { implicit def redshiftConfigDecoder: Decoder[Redshift] = deriveDecoder[Redshift] + implicit val durationDecoder: Decoder[FiniteDuration] = + Decoder[String].emap { str => + Either + .catchOnly[NumberFormatException](Duration.create(str)) + .leftMap(_.toString) + .flatMap { duration => + if (duration.isFinite) Right(duration.asInstanceOf[FiniteDuration]) + else Left(s"Cannot convert Duration $duration to FiniteDuration") + } + } + implicit def databricksConfigDecoder: Decoder[Databricks] = deriveDecoder[Databricks] diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala index a166b92bb..47645f273 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala @@ -106,4 +106,8 @@ object Statement { case class CreateTable(ddl: Fragment) extends Statement case class AlterTable(ddl: Fragment) extends Statement case class DdlFile(ddl: Fragment) extends Statement + + // Optimize (housekeeping i.e. vacuum in redshift, optimize in databricks) + case object VacuumManifest extends Statement + case object VacuumEvents extends Statement } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/VacuumScheduling.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/VacuumScheduling.scala new file mode 100644 index 000000000..0b8ba65c3 --- /dev/null +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/VacuumScheduling.scala @@ -0,0 +1,69 @@ +package com.snowplowanalytics.snowplow.rdbloader.dsl + +import fs2.Stream +import retry._ +import retry.syntax.all._ +import cats.syntax.all._ +import cats.effect.{Concurrent, MonadThrow, Timer} +import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} +import com.snowplowanalytics.snowplow.rdbloader.db.Statement +import eu.timepit.fs2cron.ScheduledStreams +import eu.timepit.fs2cron.cron4s.Cron4sScheduler +import retry.RetryDetails.{GivingUp, WillDelayAndRetry} + +import scala.concurrent.duration._ + +object VacuumScheduling { + + def retryPolicy[F[_]: Timer: Concurrent]: RetryPolicy[F] = + RetryPolicies.fibonacciBackoff[F](1.minute) join RetryPolicies.limitRetries[F](10) + + def logError[F[_]: Logging](err: Throwable, details: RetryDetails): F[Unit] = details match { + + case WillDelayAndRetry( + nextDelay: FiniteDuration, + retriesSoFar: Int, + cumulativeDelay: FiniteDuration) => Logging[F].warning( + s"Failed to vacuum with ${err.getMessage}. So far we have retried $retriesSoFar times over for $cumulativeDelay. Next attempt in $nextDelay.") + + + case GivingUp(totalRetries: Int, totalDelay: FiniteDuration) => + Logging[F].error( + s"Failed to vacuum with ${err.getMessage}. Giving up after $totalRetries retries after $totalDelay." + ) + } + + def run[F[_]: Transaction[*[_], C]: Concurrent: Logging: Timer, C[_]: DAO: MonadThrow: Logging](tgt: StorageTarget,cfg: Config.Schedules): Stream[F, Unit] = { + val vacuumEvents: Stream[F, Unit] = tgt match { + case _: StorageTarget.Databricks => cfg.optimizeEvents match { + case Some(cron) => + new ScheduledStreams(Cron4sScheduler.systemDefault[F]) + .awakeEvery(cron) + .evalMap { _ => + Transaction[F, C].transact( + Logging[C].info("initiating events vacuum") *> DAO[C].executeQuery(Statement.VacuumEvents) *> Logging[C].info("vacuum events complete") + ).retryingOnAllErrors(retryPolicy[F], logError[F]).orElse(().pure[F]) + } + case _ => Stream.empty[F] + } + case _ => Stream.empty[F] + } + + val vacuumManifest: Stream[F, Unit] = tgt match { + case _: StorageTarget.Databricks => cfg.optimizeManifest match { + case Some(cron) => + new ScheduledStreams(Cron4sScheduler.systemDefault[F]) + .awakeEvery(cron) + .evalMap { _ => + Transaction[F, C].transact( + Logging[C].info("initiating manifest vacuum") *> DAO[C].executeQuery(Statement.VacuumManifest) *> Logging[C].info("vacuum manifest complete") + ).retryingOnAllErrors(retryPolicy[F], logError[F]).orElse(().pure[F]) + } + case _ => Stream.empty[F] + } + case _ => Stream.empty[F] + } + + vacuumEvents merge vacuumManifest + } +} diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala index d4a359b70..b1c3a8a39 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala @@ -124,11 +124,16 @@ object ConfigSpec { ) val exampleSchedules: Config.Schedules = Config.Schedules(List( Config.Schedule("Maintenance window", Cron.unsafeParse("0 0 12 * * ?"), 1.hour) - )) + ), + Some(Cron.unsafeParse("0 0 0 ? * *")), + Some(Cron.unsafeParse("0 0 5 ? * *")) + ) val exampleRetryQueue: Option[Config.RetryQueue] = Some(Config.RetryQueue( 30.minutes, 64, 3, 5.seconds )) - val emptySchedules: Config.Schedules = Config.Schedules(Nil) + val defaultSchedules: Config.Schedules = Config.Schedules(Nil, + Some(Cron.unsafeParse("0 0 0 ? * *")), + Some(Cron.unsafeParse("0 0 5 ? * *"))) val exampleTimeouts: Config.Timeouts = Config.Timeouts(1.hour, 10.minutes, 5.minutes) val exampleRetries: Config.Retries = Config.Retries(Config.Strategy.Exponential, Some(3), 30.seconds, Some(1.hour)) val exampleReadyCheck: Config.Retries = Config.Retries(Config.Strategy.Constant, None, 15.seconds, None) diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala index af4f5f500..84af91439 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala @@ -11,11 +11,14 @@ */ package com.snowplowanalytics.snowplow.rdbloader.config -import io.circe.{DecodingFailure, CursorOp} +import cron4s.Cron +import io.circe.{CursorOp, DecodingFailure} import io.circe.literal._ - +import cats.syntax.all._ import org.specs2.mutable.Specification +import scala.concurrent.duration.DurationInt + class StorageTargetSpec extends Specification { "StorageTarget" should { "be parsed from valid JSON" in { @@ -72,6 +75,26 @@ class StorageTargetSpec extends Specification { } } + "Schedules config" should { + "be parsed from valid JSON" in { + val input = json"""{ + "noOperation": [], + "optimizeEvents": "", + "optimizeManifest": "*/3 * * ? * *" + }""" + val impl = Config.implicits() + import impl._ + + val expected: Config.Schedules = Config.Schedules( + noOperation = List.empty[Config.Schedule], + optimizeEvents = None, + optimizeManifest = Cron.unsafeParse("*/3 * * ? * *").some + ) + + input.as[Config.Schedules] must beRight(expected) + } + } + "TunnelConfig" should { "be parsed from valid JSON" in { val sshTunnel = json"""{ @@ -126,6 +149,39 @@ class StorageTargetSpec extends Specification { } } + "Databricks " should { + "parse full config correctly" in { + val input = json"""{ + "host": "databricks.com", + "schema": "snowplow", + "port": 443, + "httpPath": "http/path", + "password": "Supersecret1", + "userAgent": "snowplow-rdbloader-oss", + "eventsOptimizePeriod": "2 days", + "loadAuthMethod": { + "type": "NoCreds", + "roleSessionName": "rdb_loader" + } + }""" + + val expected: StorageTarget.Databricks = StorageTarget.Databricks( + host = "databricks.com", + catalog = None, + schema = "snowplow", + port = 443, + httpPath = "http/path", + password = StorageTarget.PasswordConfig.PlainText("Supersecret1"), + sshTunnel = None, + userAgent = "snowplow-rdbloader-oss", + loadAuthMethod = StorageTarget.LoadAuthMethod.NoCreds, + eventsOptimizePeriod = 2.days + ) + + input.as[StorageTarget.Databricks] must beRight(expected) + } + } + "RedshiftJdbc" should { "be parsed from valid JSON" in { val input = json"""{ diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala index e62f98a1b..52164a92b 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala @@ -212,6 +212,7 @@ object FolderMonitoringSpec { StorageTarget.PasswordConfig.PlainText("Supersecret1"), None, "snowplow-rdbloader-oss", - StorageTarget.LoadAuthMethod.NoCreds + StorageTarget.LoadAuthMethod.NoCreds, + 2.days ) } diff --git a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala index e68b4ef31..dd3b78629 100644 --- a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala +++ b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala @@ -240,6 +240,10 @@ object Redshift { throw new IllegalStateException("Redshift Loader does not use EventsCopyToTempTable statement") case _: Statement.EventsCopyFromTempTable => throw new IllegalStateException("Redshift Loader does not use EventsCopyFromTempTable statement") + case Statement.VacuumEvents => + throw new IllegalStateException("Redshift Loader does not use vacuum events table statement") + case Statement.VacuumManifest => + throw new IllegalStateException("Redshift Loader does not use vacuum manifest statement") } private def qualify(tableName: String): String = diff --git a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/ConfigSpec.scala b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/ConfigSpec.scala index a41714fc1..60ad0b34c 100644 --- a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/ConfigSpec.scala +++ b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/ConfigSpec.scala @@ -52,7 +52,7 @@ class ConfigSpec extends Specification { None, defaultMonitoring, None, - emptySchedules, + defaultSchedules, exampleTimeouts, exampleRetries.copy(cumulativeBound = None), exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds), diff --git a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala index 99315f6b8..c1f74a664 100644 --- a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala +++ b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala @@ -242,6 +242,10 @@ object Snowflake { ddl case Statement.AlterTable(ddl) => ddl + case Statement.VacuumEvents => + throw new IllegalStateException("Snowflake Loader does not use vacuum events table statement") + case Statement.VacuumManifest => + throw new IllegalStateException("Snowflake Loader does not use vacuum manifest statement") } private def qualify(tableName: String): String = diff --git a/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/ConfigSpec.scala b/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/ConfigSpec.scala index 8609765f0..2ac363c4b 100644 --- a/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/ConfigSpec.scala +++ b/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/ConfigSpec.scala @@ -103,7 +103,7 @@ class ConfigSpec extends Specification { None, defaultMonitoring, None, - emptySchedules, + defaultSchedules, exampleTimeouts, exampleRetries.copy(cumulativeBound = None), exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds), @@ -133,7 +133,7 @@ class ConfigSpec extends Specification { None, defaultMonitoring, None, - emptySchedules, + defaultSchedules, exampleTimeouts, exampleRetries.copy(cumulativeBound = None), exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds),