Skip to content

Commit

Permalink
RDB Loader: make SSL configuration compatible with native JDBC settin…
Browse files Browse the repository at this point in the history
…gs (close #73)
  • Loading branch information
chuwy committed Jul 6, 2018
1 parent 65d5fa6 commit 11efb9c
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
package com.snowplowanalytics.snowplow.rdbloader
package config

import java.util.Properties

import cats.data._
import cats.implicits._

import io.circe.{Decoder, DecodingFailure, HCursor, Json}
import io.circe._
import io.circe.Decoder._
import io.circe.generic.auto._

Expand Down Expand Up @@ -45,7 +47,6 @@ sealed trait StorageTarget extends Product with Serializable {
def database: String
def schema: String
def port: Int
def sslMode: StorageTarget.SslMode
def username: String
def password: StorageTarget.PasswordConfig

Expand Down Expand Up @@ -119,7 +120,7 @@ object StorageTarget {
host: String,
database: String,
port: Int,
sslMode: SslMode,
jdbc: RedshiftJdbc,
roleArn: String,
schema: String,
username: String,
Expand All @@ -136,17 +137,61 @@ object StorageTarget {
* All possible JDBC according to Redshift documentation, except deprecated
* and authentication-related
*/
private case class RedshiftJdbc(blockingRows: Option[Int],
disableIsValidQuery: Option[Boolean],
dsiLogLevel: Option[Int],
filterLevel: Option[String],
loginTimeout: Option[Int],
loglevel: Option[Int],
socketTimeout: Option[Int],
ssl: Option[Boolean],
sslRootCert: Option[String],
tcpKeepAlive: Option[Boolean],
tcpKeepAliveMinutes: Option[Int])
case class RedshiftJdbc(blockingRows: Option[Int],
disableIsValidQuery: Option[Boolean],
dsiLogLevel: Option[Int],
filterLevel: Option[String],
loginTimeout: Option[Int],
loglevel: Option[Int],
socketTimeout: Option[Int],
ssl: Option[Boolean],
sslMode: Option[String],
sslRootCert: Option[String],
tcpKeepAlive: Option[Boolean],
tcpKeepAliveMinutes: Option[Int]) {
/** Either errors or list of mutators to update the `Properties` object */
val validation: Either[LoaderError, List[Properties => Unit]] = jdbcEncoder.encodeObject(this).toList.map {
case (property, value) => value.fold(
((_: Properties) => ()).asRight,
b => ((props: Properties) => { props.setProperty(property, b.toString); () }).asRight,
n => n.toInt match {
case Some(num) =>
((props: Properties) => {
props.setProperty(property, num.toString)
()
}).asRight
case None => s"Impossible to apply JDBC property [$property] with value [${value.noSpaces}]".asLeft
},
s => ((props: Properties) => { props.setProperty(property, s); ()}).asRight,
_ => s"Impossible to apply JDBC property [$property] with JSON array".asLeft,
_ => s"Impossible to apply JDBC property [$property] with JSON object".asLeft
)
} traverse(_.toValidatedNel) match {
case Validated.Valid(updaters) => updaters.asRight[LoaderError]
case Validated.Invalid(errors) =>
val messages = "Invalid JDBC options: " ++ errors.toList.mkString(", ")
val error: LoaderError = LoaderError.DecodingError(messages)
error.asLeft[List[Properties => Unit]]
}
}

object RedshiftJdbc {
val empty = RedshiftJdbc(None, None, None, None, None, None, None, None, None, None, None, None)
}

implicit val jdbcDecoder: Decoder[RedshiftJdbc] =
Decoder.forProduct12("BlockingRowsMode", "DisableIsValidQuery", "DSILogLevel",
"FilterLevel", "loginTimeout", "loglevel", "socketTimeout", "ssl", "sslMode",
"sslRootCert", "tcpKeepAlive", "TCPKeepAliveMinutes")(RedshiftJdbc.apply)

implicit val jdbcEncoder: ObjectEncoder[RedshiftJdbc] =
Encoder.forProduct12("BlockingRowsMode", "DisableIsValidQuery", "DSILogLevel",
"FilterLevel", "loginTimeout", "loglevel", "socketTimeout", "ssl", "sslMode",
"sslRootCert", "tcpKeepAlive", "TCPKeepAliveMinutes")((j: RedshiftJdbc) =>
(j.blockingRows, j.disableIsValidQuery, j.dsiLogLevel,
j.filterLevel, j.loginTimeout, j.loglevel, j.socketTimeout, j.ssl, j.sslMode,
j.sslRootCert, j.tcpKeepAlive, j.tcpKeepAliveMinutes))


/** Reference to encrypted entity inside EC2 Parameter Store */
case class ParameterStoreConfig(parameterName: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class RealWorldInterpreter private[interpreters](

// dbConnection is Either because not required for log dump
// lazy to wait before tunnel established
private lazy val dbConnection = PgInterpreter.getConnection(cliConfig.target)
private lazy val dbConnection = JdbcInterpreter.getConnection(cliConfig.target)

lazy val manifest =
ManifestInterpreter.initialize(cliConfig.target.processingManifest, cliConfig.configYaml.aws.s3.region, utils.Common.DefaultResolver) match {
Expand Down Expand Up @@ -129,20 +129,20 @@ class RealWorldInterpreter private[interpreters](

val result = for {
conn <- dbConnection
res <- PgInterpreter.executeUpdate(conn)(query)
res <- JdbcInterpreter.executeUpdate(conn)(query)
} yield res
result.asInstanceOf[Id[A]]
case CopyViaStdin(files, query) =>
for {
conn <- dbConnection
_ = log(s"Copying ${files.length} files via stdin")
res <- PgInterpreter.copyViaStdin(conn, files, query)
res <- JdbcInterpreter.copyViaStdin(conn, files, query)
} yield res

case ExecuteQuery(query, d) =>
for {
conn <- dbConnection
res <- PgInterpreter.executeQuery(conn)(query)(d)
res <- JdbcInterpreter.executeQuery(conn)(query)(d)
} yield res

case CreateTmpDir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import db.Decoder
import config.StorageTarget
import loaders.Common.SqlString

object PgInterpreter {
object JdbcInterpreter {

/**
* Execute a single update-statement in provided Postgres connection
Expand Down Expand Up @@ -102,10 +102,6 @@ object PgInterpreter {
case NonFatal(e) => Left(StorageTargetError(e.toString))
}

def countRows(conn: Connection, queryStatement: String): Either[LoaderError, Int] = {
???
}

/**
* Get Redshift or Postgres connection
*/
Expand All @@ -120,21 +116,24 @@ object PgInterpreter {
val props = new Properties()
props.setProperty("user", target.username)
props.setProperty("password", password)
props.setProperty("tcpKeepAlive", "true")

target match {
case r: StorageTarget.RedshiftConfig =>
val url = s"jdbc:redshift://${target.host}:${target.port}/${target.database}"
if (r.sslMode == StorageTarget.Disable) { // "disable" and "require" are not supported
props.setProperty("ssl", "false") // by native Redshift JDBC Driver
} else { // http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-options.html
props.setProperty("ssl", "true")
}
Right(new RedshiftDriver().connect(url, props))

case _: StorageTarget.PostgresqlConfig =>
val url = s"jdbc:postgresql://${target.host}:${target.port}/${target.database}"
props.setProperty("sslmode", target.sslMode.asProperty)
for {
_ <- r.jdbc.validation match {
case Left(error) => error.asLeft
case Right(propertyUpdaters) =>
propertyUpdaters.foreach(f => f(props)).asRight
}
connection <- Either.catchNonFatal(new RedshiftDriver().connect(url, props)).leftMap { x =>
LoaderError.StorageTargetError(x.getMessage)
}
} yield connection

case p: StorageTarget.PostgresqlConfig =>
val url = s"jdbc:postgresql://${p.host}:${p.port}/${p.database}"
props.setProperty("sslmode", p.sslMode.asProperty)
Right(new PgDriver().connect(url, props))
}
} catch {
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/resolver.json.base64
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ewogICJzY2hlbWEiOiAiaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3MuaWdsdS9yZXNvbHZlci1jb25maWcvanNvbnNjaGVtYS8xLTAtMSIsCiAgImRhdGEiOiB7CiAgICAiY2FjaGVTaXplIjogNTAwLAogICAgInJlcG9zaXRvcmllcyI6IFsKICAgICAgewogICAgICAgICJuYW1lIjogIklnbHUgQ2VudHJhbCIsCiAgICAgICAgInByaW9yaXR5IjogMCwKICAgICAgICAidmVuZG9yUHJlZml4ZXMiOiBbICJjb20uc25vd3Bsb3dhbmFseXRpY3MiIF0sCiAgICAgICAgImNvbm5lY3Rpb24iOiB7CiAgICAgICAgICAiaHR0cCI6IHsKICAgICAgICAgICAgInVyaSI6ICJodHRwOi8vaWdsdWNlbnRyYWwuY29tIgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfQogICAgXQogIH0KfQo=
ew0KICAic2NoZW1hIjogImlnbHU6Y29tLnNub3dwbG93YW5hbHl0aWNzLmlnbHUvcmVzb2x2ZXItY29uZmlnL2pzb25zY2hlbWEvMS0wLTEiLA0KICAiZGF0YSI6IHsNCiAgICAiY2FjaGVTaXplIjogNTAwLA0KICAgICJyZXBvc2l0b3JpZXMiOiBbDQogICAgICB7DQogICAgICAgICJuYW1lIjogIklnbHUgQ2VudHJhbCIsDQogICAgICAgICJwcmlvcml0eSI6IDAsDQogICAgICAgICJ2ZW5kb3JQcmVmaXhlcyI6IFsgImNvbS5zbm93cGxvd2FuYWx5dGljcyIgXSwNCiAgICAgICAgImNvbm5lY3Rpb24iOiB7DQogICAgICAgICAgImh0dHAiOiB7DQogICAgICAgICAgICAidXJpIjogImh0dHA6Ly9pZ2x1Y2VudHJhbC5jb20iDQogICAgICAgICAgfQ0KICAgICAgICB9DQogICAgICB9LA0KICAgICAgew0KICAgICAgICAibmFtZSI6ICJFbWJlZGRlZCBUZXN0IiwNCiAgICAgICAgInByaW9yaXR5IjogMCwNCiAgICAgICAgInZlbmRvclByZWZpeGVzIjogWyAiY29tLnNub3dwbG93YW5hbHl0aWNzIiBdLA0KICAgICAgICAiY29ubmVjdGlvbiI6IHsNCiAgICAgICAgICAiZW1iZWRkZWQiOiB7DQogICAgICAgICAgICAicGF0aCI6ICIvZW1iZWQiDQogICAgICAgICAgfQ0KICAgICAgICB9DQogICAgICB9DQogICAgXQ0KICB9DQp9DQo=
2 changes: 1 addition & 1 deletion src/test/resources/valid-redshift.json.base64
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ew0KICAgICJzY2hlbWEiOiAiaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3Muc25vd3Bsb3cuc3RvcmFnZS9yZWRzaGlmdF9jb25maWcvanNvbnNjaGVtYS8yLTAtMCIsDQogICAgImRhdGEiOiB7DQogICAgICAgICJuYW1lIjogIkFXUyBSZWRzaGlmdCBlbnJpY2hlZCBldmVudHMgc3RvcmFnZSIsDQogICAgICAgICJpZCI6ICJlMTdjMGRlZDAtZWVlNy00ODQ1LWE3ZTYtOGZkYzg4ZDU5OWQwIiwNCiAgICAgICAgImhvc3QiOiAiYW5na29yLXdhdC1maW5hbC5jY3h2ZHB6MDF4bnIudXMtZWFzdC0xLnJlZHNoaWZ0LmFtYXpvbmF3cy5jb20iLA0KICAgICAgICAiZGF0YWJhc2UiOiAic25vd3Bsb3ciLA0KICAgICAgICAicG9ydCI6IDU0MzksDQogICAgICAgICJzc2xNb2RlIjogIkRJU0FCTEUiLA0KICAgICAgICAidXNlcm5hbWUiOiAiYWRtaW4iLA0KICAgICAgICAicGFzc3dvcmQiOiAiU3VwZXJzZWNyZXQxIiwNCiAgICAgICAgInNjaGVtYSI6ICJhdG9taWMiLA0KICAgICAgICAicm9sZUFybiI6ICJhcm46YXdzOmlhbTo6MTIzNDU2Nzg5ODc2OnJvbGUvUmVkc2hpZnRMb2FkUm9sZSIsDQogICAgICAgICJtYXhFcnJvciI6IDEsDQogICAgICAgICJjb21wUm93cyI6IDIwMDAwLA0KICAgICAgICAicHVycG9zZSI6ICJFTlJJQ0hFRF9FVkVOVFMiDQogICAgfQ0KfQ==
ew0KCSJzY2hlbWEiOiAiaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3Muc25vd3Bsb3cuc3RvcmFnZS9yZWRzaGlmdF9jb25maWcvanNvbnNjaGVtYS8zLTAtMCIsDQoJImRhdGEiOiB7DQoJCSJuYW1lIjogIkFXUyBSZWRzaGlmdCBlbnJpY2hlZCBldmVudHMgc3RvcmFnZSIsDQoJCSJpZCI6ICJlMTdjMGRlZDAtZWVlNy00ODQ1LWE3ZTYtOGZkYzg4ZDU5OWQwIiwNCgkJImhvc3QiOiAiYW5na29yLXdhdC1maW5hbC5jY3h2ZHB6MDF4bnIudXMtZWFzdC0xLnJlZHNoaWZ0LmFtYXpvbmF3cy5jb20iLA0KCQkiZGF0YWJhc2UiOiAic25vd3Bsb3ciLA0KCQkicG9ydCI6IDU0MzksDQoJCSJqZGJjIjogew0KCQkJInNzbCI6IGZhbHNlDQoJCX0sDQoJCSJ1c2VybmFtZSI6ICJhZG1pbiIsDQoJCSJwYXNzd29yZCI6ICJTdXBlcnNlY3JldDEiLA0KCQkic2NoZW1hIjogImF0b21pYyIsDQoJCSJyb2xlQXJuIjogImFybjphd3M6aWFtOjoxMjM0NTY3ODk4NzY6cm9sZS9SZWRzaGlmdExvYWRSb2xlIiwNCgkJInByb2Nlc3NpbmdNYW5pZmVzdCI6IG51bGwsDQoJCSJzc2hUdW5uZWwiOiBudWxsLA0KCQkibWF4RXJyb3IiOiAxLA0KCQkiY29tcFJvd3MiOiAyMDAwMCwNCgkJInB1cnBvc2UiOiAiRU5SSUNIRURfRVZFTlRTIg0KCX0NCn0=
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,16 @@ object SpecHelpers {
Storage(StorageVersions(Semver(0,12,0,Some(ReleaseCandidate(4))),Semver(0,1,0,None))),
Monitoring(Map(),Logging(DebugLevel),Some(SnowplowMonitoring(Some(GetMethod),Some("batch-pipeline"),Some("snplow.acme.com")))))

val disableSsl = StorageTarget.RedshiftJdbc.empty.copy(ssl = Some(false))
val enableSsl = StorageTarget.RedshiftJdbc.empty.copy(ssl = Some(true))

val validTarget = StorageTarget.RedshiftConfig(
"e17c0ded0-eee7-4845-a7e6-8fdc88d599d0",
"AWS Redshift enriched events storage",
"angkor-wat-final.ccxvdpz01xnr.us-east-1.redshift.amazonaws.com",
"snowplow",
5439,
StorageTarget.Disable,
disableSsl,
"arn:aws:iam::123456789876:role/RedshiftLoadRole",
"atomic",
"admin",
Expand Down
Loading

0 comments on commit 11efb9c

Please sign in to comment.