Skip to content

Commit

Permalink
Snowflake Loader: prioritize transformedStage over loadAuthMethod in …
Browse files Browse the repository at this point in the history
…config (close #1061)
  • Loading branch information
istreeter committed Sep 22, 2022
1 parent 4940299 commit c030c70
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ object Loader {
val loading: F[Unit] = backgroundCheck {
for {
start <- Clock[F].instantNow
loadAuth <- AuthService.getLoadAuthMethod[F](config.storage.loadAuthMethod, config.region.name, config.timeouts.loading)
loadAuth <- AuthService.getLoadAuthMethod[F](config.storage.eventsLoadAuthMethod, config.region.name, config.timeouts.loading)
result <- Load.load[F, C](config, setStageC, control.incrementAttempts, discovery, loadAuth)
attempts <- control.getAndResetAttempts
_ <- result match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ sealed trait StorageTarget extends Product with Serializable {
def withAutoCommit: Boolean = false
def connectionUrl: String
def properties: Properties
def loadAuthMethod: StorageTarget.LoadAuthMethod
def eventsLoadAuthMethod: StorageTarget.LoadAuthMethod
def foldersLoadAuthMethod: StorageTarget.LoadAuthMethod
}

object StorageTarget {
Expand Down Expand Up @@ -81,11 +82,11 @@ object StorageTarget {
password: PasswordConfig,
maxError: Int,
sshTunnel: Option[TunnelConfig]) extends StorageTarget {
def driver: String = "com.amazon.redshift.jdbc42.Driver"
override def driver: String = "com.amazon.redshift.jdbc42.Driver"

def connectionUrl: String = s"jdbc:redshift://$host:$port/$database"
override def connectionUrl: String = s"jdbc:redshift://$host:$port/$database"

def properties: Properties = {
override def properties: Properties = {
val props = new Properties()
jdbc.validation match {
case Right(updaters) =>
Expand All @@ -96,7 +97,8 @@ object StorageTarget {
props
}

def loadAuthMethod: LoadAuthMethod = LoadAuthMethod.NoCreds
override def eventsLoadAuthMethod: LoadAuthMethod = LoadAuthMethod.NoCreds
override def foldersLoadAuthMethod: LoadAuthMethod = LoadAuthMethod.NoCreds
}

final case class Databricks(
Expand Down Expand Up @@ -131,6 +133,9 @@ object StorageTarget {
props.put("UserAgentEntry", userAgent)
props
}

override def eventsLoadAuthMethod: LoadAuthMethod = loadAuthMethod
override def foldersLoadAuthMethod: LoadAuthMethod = loadAuthMethod
}

final case class Snowflake(snowflakeRegion: Option[String],
Expand All @@ -147,7 +152,7 @@ object StorageTarget {
jdbcHost: Option[String],
loadAuthMethod: LoadAuthMethod) extends StorageTarget {

def connectionUrl: String =
override def connectionUrl: String =
host match {
case Right(h) =>
s"jdbc:snowflake://$h"
Expand All @@ -156,9 +161,9 @@ object StorageTarget {
throw new IllegalStateException(s"Error deriving host: $e")
}

def sshTunnel: Option[TunnelConfig] = None
override def sshTunnel: Option[TunnelConfig] = None

def properties: Properties = {
override def properties: Properties = {
val props: Properties = new Properties()
props.put("warehouse", warehouse)
props.put("db", database)
Expand All @@ -167,7 +172,7 @@ object StorageTarget {
props
}

def driver: String = "net.snowflake.client.jdbc.SnowflakeDriver"
override def driver: String = "net.snowflake.client.jdbc.SnowflakeDriver"

def host: Either[String, String] = {
// See https://docs.snowflake.com/en/user-guide/jdbc-configure.html#connection-parameters
Expand Down Expand Up @@ -199,6 +204,11 @@ object StorageTarget {
"Snowflake config requires either jdbcHost or both account and region".asLeft
}
}

override def eventsLoadAuthMethod: LoadAuthMethod =
transformedStage.fold(loadAuthMethod)(_ => LoadAuthMethod.NoCreds)
override def foldersLoadAuthMethod: LoadAuthMethod =
folderMonitoringStage.fold(loadAuthMethod)(_ => LoadAuthMethod.NoCreds)
}

object Snowflake {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ object FolderMonitoring {
Logging[F].info("Monitoring shredded folders") *>
sinkFolders[F](folders.since, folders.until, folders.transformerOutput, outputFolder).ifM(
for {
loadAuth <- AuthService.getLoadAuthMethod[F](storageTarget.loadAuthMethod, region, timeouts.loading)
loadAuth <- AuthService.getLoadAuthMethod[F](storageTarget.foldersLoadAuthMethod, region, timeouts.loading)
alerts <- check[F, C](outputFolder, readyCheck, storageTarget, loadAuth)
_ <- alerts.traverse_ { payload =>
val warn = payload.base match {
Expand Down

0 comments on commit c030c70

Please sign in to comment.