Skip to content

Commit

Permalink
RDB Loader: Timeouts on JDBC statements (close #914)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored and spenes committed Jun 7, 2022
1 parent 2821f28 commit ff75ca0
Show file tree
Hide file tree
Showing 15 changed files with 57 additions and 58 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/

lazy val root = project.in(file("."))
.aggregate(common, aws, loader, redshiftLoader, snowflakeLoader, transformerBatch, transformerKinesis)
.aggregate(common, aws, loader, databricksLoader, redshiftLoader, snowflakeLoader, transformerBatch, transformerKinesis)

lazy val aws = project
.in(file("modules/aws"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ object Databricks {

def toFragment(statement: Statement): Fragment =
statement match {
case Statement.Begin => sql"BEGIN"
case Statement.Commit => sql"COMMIT"
case Statement.Abort => sql"ABORT"
case Statement.Select1 => sql"SELECT 1"
case Statement.ReadyCheck => sql"SELECT 1"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ object Loader {
_ <- if (columns.map(_.toLowerCase).contains(AtomicColumns.ColumnsWithDefault.LoadTstamp))
Logging[F].info("load_tstamp column already exists")
else
DAO[F].executeUpdate(Statement.AddLoadTstampColumn).void *>
DAO[F].executeUpdate(Statement.AddLoadTstampColumn, DAO.Purpose.NonLoading).void *>
Logging[F].info("load_tstamp column is added successfully")
} yield ()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import com.snowplowanalytics.snowplow.rdbloader.dsl.DAO
/** Set of common functions to control DB entities */
object Control {
def renameTable[F[_]: Functor: DAO](from: String, to: String): F[Unit] =
DAO[F].executeUpdate(Statement.RenameTable(from, to)).void
DAO[F].executeUpdate(Statement.RenameTable(from, to), DAO.Purpose.NonLoading).void

def tableExists[F[_]: DAO](tableName: String): F[Boolean] =
DAO[F].executeQuery[Boolean](Statement.TableExists(tableName))

def getColumns[F[_]: Monad: DAO](tableName: String): F[List[String]] =
for {
_ <- DAO[F].executeUpdate(Statement.SetSchema)
_ <- DAO[F].executeUpdate(Statement.SetSchema, DAO.Purpose.NonLoading)
columns <- DAO[F].executeQueryList[String](Statement.GetColumns(tableName))
} yield columns
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object Manifest {
_ <- status match {
case InitStatus.Migrated | InitStatus.Created =>
target match {
case _: Redshift => DAO[F].executeUpdate(Statement.CommentOn(s"$schema.$Name", "0.2.0"))
case _: Redshift => DAO[F].executeUpdate(Statement.CommentOn(s"$schema.$Name", "0.2.0"), DAO.Purpose.NonLoading)
case _ => Monad[F].unit
}
case _ =>
Expand All @@ -73,14 +73,14 @@ object Manifest {
}

def add[F[_]: DAO: Functor](item: LoaderMessage.ManifestItem): F[Unit] =
DAO[F].executeUpdate(Statement.ManifestAdd(item)).void
DAO[F].executeUpdate(Statement.ManifestAdd(item), DAO.Purpose.NonLoading).void

def get[F[_]: DAO](base: S3.Folder): F[Option[Entry]] =
DAO[F].executeQueryOption[Entry](Statement.ManifestGet(base))(Entry.entryRead)

/** Create manifest table */
def create[F[_]: DAO: Functor]: F[Unit] =
DAO[F].executeUpdate(DAO[F].target.getManifest).void
DAO[F].executeUpdate(DAO[F].target.getManifest, DAO.Purpose.NonLoading).void


case class Entry(ingestion: Instant, meta: LoaderMessage.ManifestItem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,35 +216,35 @@ object Migration {
getPredicate[F](blocks).map { shouldAdd =>
blocks.foldLeft(Migration.empty[F]) {
case (migration, block) if block.isEmpty =>
val action = DAO[F].executeUpdate(block.getCommentOn) *> Logging[F].warning(s"Empty migration for ${block.getName}")
val action = DAO[F].executeUpdate(block.getCommentOn, DAO.Purpose.NonLoading) *> Logging[F].warning(s"Empty migration for ${block.getName}")
migration.addPreTransaction(action)

case (migration, b@Block(pre, in, entity)) if pre.nonEmpty && in.nonEmpty =>
val preAction = preMigration[F](shouldAdd, entity, pre)
val inAction = Logging[F].info(s"Migrating ${b.getName} (in-transaction)") *>
in.traverse_(item => DAO[F].executeUpdate(item.statement)) *>
DAO[F].executeUpdate(b.getCommentOn) *>
in.traverse_(item => DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading)) *>
DAO[F].executeUpdate(b.getCommentOn, DAO.Purpose.NonLoading) *>
Logging[F].info(s"${b.getName} migration completed")
migration.addPreTransaction(preAction).addInTransaction(inAction)

case (migration, b@Block(Nil, in, target)) if b.isCreation =>
val inAction = Logging[F].info(s"Creating ${b.getName} table for ${target.getInfo.toSchemaUri}") *>
in.traverse_(item => DAO[F].executeUpdate(item.statement)) *>
DAO[F].executeUpdate(b.getCommentOn) *>
in.traverse_(item => DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading)) *>
DAO[F].executeUpdate(b.getCommentOn, DAO.Purpose.NonLoading) *>
Logging[F].info("Table created")
migration.addInTransaction(inAction)

case (migration, b@Block(Nil, in, _)) =>
val inAction = Logging[F].info(s"Migrating ${b.getName} (in-transaction)") *>
in.traverse_(item => DAO[F].executeUpdate(item.statement)) *>
DAO[F].executeUpdate(b.getCommentOn) *>
in.traverse_(item => DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading)) *>
DAO[F].executeUpdate(b.getCommentOn, DAO.Purpose.NonLoading) *>
Logging[F].info(s"${b.getName} migration completed")
migration.addInTransaction(inAction)

case (migration, b@Block(pre, Nil, Entity.Table(_, _))) =>
val preAction = Logging[F].info(s"Migrating ${b.getName} (pre-transaction)") *>
pre.traverse_(item => DAO[F].executeUpdate(item.statement).void) *>
DAO[F].executeUpdate(b.getCommentOn).void *>
pre.traverse_(item => DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading).void) *>
DAO[F].executeUpdate(b.getCommentOn, DAO.Purpose.NonLoading).void *>
Logging[F].info(s"${b.getName} migration completed")
migration.addPreTransaction(preAction)

Expand All @@ -258,7 +258,7 @@ object Migration {
def preMigration[F[_]: DAO: Logging: Monad](shouldAdd: Entity => Boolean, entity: Entity, items: List[Item]) =
if (shouldAdd(entity))
Logging[F].info(s"Migrating ${entity.getName} (pre-transaction)") *>
items.traverse_(item => DAO[F].executeUpdate(item.statement).void)
items.traverse_(item => DAO[F].executeUpdate(item.statement, DAO.Purpose.NonLoading).void)
else Monad[F].unit

def emptyBlock[F[_]: Monad]: F[Option[Block]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ object Statement {
}

// Common
case object Begin extends Statement
case object Commit extends Statement
case object Abort extends Statement
case object Select1 extends Statement
case object ReadyCheck extends Statement

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.dsl

import doobie.{Read, ConnectionIO}
import doobie.free.connection
import doobie.{Read, ConnectionIO, HPS, HRS}

import com.snowplowanalytics.snowplow.rdbloader.config.Config
import com.snowplowanalytics.snowplow.rdbloader.db.{ Statement, Target }

/**
Expand All @@ -30,7 +30,7 @@ import com.snowplowanalytics.snowplow.rdbloader.db.{ Statement, Target }
trait DAO[C[_]] {

/** Execute single SQL statement */
def executeUpdate(sql: Statement): C[Int]
def executeUpdate(sql: Statement, purpose: DAO.Purpose): C[Int]

/** Execute query and parse results into `A` */
def executeQuery[A](query: Statement)(implicit A: Read[A]): C[A]
Expand All @@ -41,36 +41,49 @@ trait DAO[C[_]] {
/** Execute query and parse results into 0 or one `A` */
def executeQueryOption[A](query: Statement)(implicit A: Read[A]): C[Option[A]]

/** Rollback the transaction */
def rollback: C[Unit]

/** Get the DB interpreter */
def target: Target
}

object DAO {

sealed trait Purpose
object Purpose {
case object Loading extends Purpose
case object NonLoading extends Purpose
}

def apply[F[_]](implicit ev: DAO[F]): DAO[F] = ev

def connectionIO(dbTarget: Target): DAO[ConnectionIO] = new DAO[ConnectionIO] {
def connectionIO(dbTarget: Target, timeouts: Config.Timeouts): DAO[ConnectionIO] = new DAO[ConnectionIO] {
/** Execute single SQL statement (against target in interpreter) */
def executeUpdate(sql: Statement): ConnectionIO[Int] =
dbTarget.toFragment(sql).update.run
def executeUpdate(sql: Statement, purpose: Purpose): ConnectionIO[Int] = {
val timeout = purpose match {
case Purpose.Loading => timeouts.loading
case Purpose.NonLoading => timeouts.nonLoading
}
dbTarget.toFragment(sql).execWith {
HPS.setQueryTimeout(timeout.toSeconds.toInt).flatMap(_ => HPS.executeUpdate)
}
}

/** Execute query and parse results into `A` */
def executeQuery[A](query: Statement)(implicit A: Read[A]): ConnectionIO[A] =
dbTarget.toFragment(query).query[A].unique
dbTarget.toFragment(query).execWith {
HPS.setQueryTimeout(timeouts.nonLoading.toSeconds.toInt).flatMap(_ => HPS.executeQuery(HRS.getUnique))
}

def executeQueryList[A](query: Statement)(implicit A: Read[A]): ConnectionIO[List[A]] =
dbTarget.toFragment(query).query[A].to[List]
dbTarget.toFragment(query).execWith {
HPS.setQueryTimeout(timeouts.nonLoading.toSeconds.toInt).flatMap(_ => HPS.executeQuery(HRS.build))
}

def executeQueryOption[A](query: Statement)(implicit A: Read[A]): ConnectionIO[Option[A]] =
dbTarget.toFragment(query).query[A].option

def rollback: ConnectionIO[Unit] =
connection.rollback
dbTarget.toFragment(query).execWith {
HPS.setQueryTimeout(timeouts.nonLoading.toSeconds.toInt).flatMap(_ => HPS.executeQuery(HRS.getOption))
}

def target: Target =
dbTarget
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import io.sentry.{SentryClient, Sentry, SentryOptions}

import com.snowplowanalytics.snowplow.rdbloader.state.{Control, State}
import com.snowplowanalytics.snowplow.rdbloader.common.S3
import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig
import com.snowplowanalytics.snowplow.rdbloader.config.{CliConfig, Config}
import com.snowplowanalytics.snowplow.rdbloader.db.Target
import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics._
import com.snowplowanalytics.snowplow.rdbloader.utils.SSH
Expand All @@ -44,15 +44,16 @@ class Environment[F[_]](cache: Cache[F],
aws: AWS[F],
transaction: Transaction[F, ConnectionIO],
state: State.Ref[F],
target: Target) {
target: Target,
timeouts: Config.Timeouts) {
implicit val cacheF: Cache[F] = cache
implicit val loggingF: Logging[F] = logging
implicit val monitoringF: Monitoring[F] = monitoring
implicit val igluF: Iglu[F] = iglu
implicit val awsF: AWS[F] = aws
implicit val transactionF: Transaction[F, ConnectionIO] = transaction

implicit val daoC: DAO[ConnectionIO] = DAO.connectionIO(target)
implicit val daoC: DAO[ConnectionIO] = DAO.connectionIO(target, timeouts)
implicit val loggingC: Logging[ConnectionIO] = logging.mapK(transaction.arrowBack)

def control: Control[F] =
Expand Down Expand Up @@ -85,7 +86,7 @@ object Environment {

_ <- SSH.resource(cli.config.storage.sshTunnel)
transaction <- Transaction.interpreter[F](cli.config.storage, blocker)
} yield new Environment[F](cache, logging, monitoring, iglu, aws, transaction, state, statementer)
} yield new Environment[F](cache, logging, monitoring, iglu, aws, transaction, state, statementer, cli.config.timeouts)

def initSentry[F[_]: Logging: Sync](dsn: Option[URI]): Resource[F, Option[SentryClient]] =
dsn match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ object FolderMonitoring {
readyCheck: Config.Retries,
storageTarget: StorageTarget): F[List[AlertPayload]] = {
val getBatches = for {
_ <- DAO[C].executeUpdate(DropAlertingTempTable)
_ <- DAO[C].executeUpdate(CreateAlertingTempTable)
_ <- DAO[C].executeUpdate(FoldersCopy(loadFrom))
_ <- DAO[C].executeUpdate(DropAlertingTempTable, DAO.Purpose.NonLoading)
_ <- DAO[C].executeUpdate(CreateAlertingTempTable, DAO.Purpose.NonLoading)
_ <- DAO[C].executeUpdate(FoldersCopy(loadFrom), DAO.Purpose.NonLoading)
onlyS3Batches <- DAO[C].executeQueryList[S3.Folder](FoldersMinusManifest)
} yield onlyS3Batches

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ object Load {
_ <- DAO[F].target.getLoadStatements(discovery).traverse_ { statement =>
Logging[F].info(statement.title) *>
setLoading(statement.table) *>
DAO[F].executeUpdate(statement).void
DAO[F].executeUpdate(statement, DAO.Purpose.Loading).void
}
_ <- Logging[F].info(s"Folder [${discovery.base}] has been loaded (not committed yet)")
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object TargetCheck {
val retryPolicy = Retry.getRetryPolicy[F](readyCheckConfig)
val fa: F[Unit] = target match {
case _: StorageTarget.Snowflake =>
Transaction[F, C].run(DAO[C].executeUpdate(Statement.ReadyCheck)).void
Transaction[F, C].run(DAO[C].executeUpdate(Statement.ReadyCheck, DAO.Purpose.NonLoading)).void
case _: StorageTarget.Databricks | _: StorageTarget.Redshift =>
Transaction[F, C].run(DAO[C].executeQuery[Int](Statement.ReadyCheck)).void
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object PureDAO {
val init: PureDAO = custom(getResult)

def interpreter(results: PureDAO, tgt: Target = DummyTarget): DAO[Pure] = new DAO[Pure] {
def executeUpdate(sql: Statement): Pure[Int] =
def executeUpdate(sql: Statement, purpose: DAO.Purpose): Pure[Int] =
results.executeUpdate(sql)

def executeQuery[A](query: Statement)(implicit A: Read[A]): Pure[A] =
Expand All @@ -86,9 +86,6 @@ object PureDAO {
def executeQueryOption[A](query: Statement)(implicit A: Read[A]): Pure[Option[A]] =
results.executeQuery.asInstanceOf[Statement => Pure[Option[A]]](query)

def rollback: Pure[Unit] =
Pure.modify(_.log(PureTransaction.Rollback))

def target: Target = tgt
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ object Redshift {

def toFragment(statement: Statement): Fragment =
statement match {
case Statement.Begin => sql"BEGIN"
case Statement.Commit => sql"COMMIT"
case Statement.Abort => sql"ABORT"
case Statement.Select1 => sql"SELECT 1"
case Statement.ReadyCheck => sql"SELECT 1"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ object Snowflake {

def toFragment(statement: Statement): Fragment =
statement match {
case Statement.Begin => sql"BEGIN"
case Statement.Commit => sql"COMMIT"
case Statement.Abort => sql"ABORT"
case Statement.Select1 => sql"SELECT 1" // OK
case Statement.ReadyCheck => sql"ALTER WAREHOUSE ${Fragment.const0(warehouse)} RESUME IF SUSPENDED"

Expand Down

0 comments on commit ff75ca0

Please sign in to comment.