Skip to content

Commit

Permalink
Common: allow tabular blacklisting (close #156)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Feb 28, 2020
1 parent 79a9de2 commit 780b999
Show file tree
Hide file tree
Showing 32 changed files with 555 additions and 314 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ lazy val common = project.in(file("common"))
Dependencies.circeGeneric,
Dependencies.circeGenericExtra,
Dependencies.circeLiteral,
Dependencies.schemaDdl
Dependencies.schemaDdl,
Dependencies.specs2
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.common

import io.circe.Json
import io.circe.{ Json, Encoder }
import io.circe.syntax._
import io.circe.literal._

import cats.Monad
import cats.data.EitherT
Expand All @@ -24,10 +26,10 @@ import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.ClientError.ResolutionError
import com.snowplowanalytics.iglu.client.ClientError

import com.snowplowanalytics.iglu.schemaddl.IgluSchema
import com.snowplowanalytics.iglu.schemaddl.migrations.Migration.OrderedSchemas
import com.snowplowanalytics.iglu.schemaddl.migrations.{ SchemaList => DdlSchemaList }
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._

Expand All @@ -37,27 +39,39 @@ object Flattening {
* `SchemaList` is unavailable (in case no Iglu Server hosts this schemas)
* Particular schema could not be fetched, thus whole flattening algorithm cannot be built
*/
sealed trait FlatteningError
sealed trait FlatteningError extends Product with Serializable

object FlatteningError {
case class SchemaListResolution(error: ResolutionError) extends FlatteningError
case class SchemaResolution(error: ResolutionError) extends FlatteningError
case class Parsing(error: String) extends FlatteningError
}
final case class SchemaListResolution(resolutionError: ClientError.ResolutionError, criterion: SchemaCriterion) extends FlatteningError
final case class SchemaResolution(resolutionError: ClientError.ResolutionError, schema: SchemaKey) extends FlatteningError
final case class Parsing(parsingError: String) extends FlatteningError

// Cache = Map[SchemaKey, OrderedSchemas]
implicit val flatteningErrorCirceEncoder: Encoder[FlatteningError] =
Encoder.instance {
case SchemaListResolution(error, criterion) =>
json"""{"resolutionError": ${error: ClientError}, "criterion": ${criterion.asString}, "type": "SchemaListResolution"}"""
case SchemaResolution(error, schema) =>
json"""{"resolutionError": ${error: ClientError}, "schema": ${schema.toSchemaUri}, "type": "SchemaResolution"}"""
case Parsing(error) =>
json"""{"parsingError": $error, "type": "Parsing"}"""
}
}

def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], key: SchemaKey): EitherT[F, FlatteningError, OrderedSchemas] =
def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], key: SchemaKey): EitherT[F, FlatteningError, DdlSchemaList] =
getOrdered(resolver, key.vendor, key.name, key.version.model)

def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], vendor: String, name: String, model: Int): EitherT[F, FlatteningError, OrderedSchemas] =
def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], vendor: String, name: String, model: Int): EitherT[F, FlatteningError, DdlSchemaList] = {
val criterion = SchemaCriterion(vendor, name, "jsonschema", Some(model), None, None)
val schemaList = resolver.listSchemas(vendor, name, Some(model))
for {
schemaList <- EitherT[F, ResolutionError, SchemaList](resolver.listSchemas(vendor, name, Some(model))).leftMap(FlatteningError.SchemaListResolution)
ordered <- OrderedSchemas.fromSchemaList(schemaList, fetch(resolver))
schemaList <- EitherT[F, ClientError.ResolutionError, SchemaList](schemaList).leftMap(error => FlatteningError.SchemaListResolution(error, criterion))
ordered <- DdlSchemaList.fromSchemaList(schemaList, fetch(resolver))
} yield ordered
}

def fetch[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F])(key: SchemaKey): EitherT[F, FlatteningError, IgluSchema] =
for {
json <- EitherT(resolver.lookupSchema(key, 2)).leftMap(FlatteningError.SchemaResolution)
json <- EitherT(resolver.lookupSchema(key)).leftMap(error => FlatteningError.SchemaResolution(error, key))
schema <- EitherT.fromEither(parseSchema(json))
} yield schema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,22 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader
package config
package com.snowplowanalytics.snowplow.rdbloader.common

import java.util.Properties

import cats.Id
import cats.data._
import cats.implicits._

import io.circe._
import io.circe.parser.parse
import io.circe.Decoder._
import io.circe.generic.auto._

import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.instances._
import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.core.circe.instances._
import com.snowplowanalytics.iglu.core.{SelfDescribingData, SchemaCriterion}

// This project
import LoaderError._
import utils.Common._

import io.circe._
import io.circe.Decoder._
import io.circe.generic.semiauto._
import io.circe.parser.parse

/**
* Common configuration for JDBC target, such as Redshift and Postgres
Expand All @@ -49,25 +43,29 @@ sealed trait StorageTarget extends Product with Serializable {

def processingManifest: Option[StorageTarget.ProcessingManifestConfig]

def eventsTable: String =
loaders.Common.getEventsTable(schema)

def shreddedTable(tableName: String): String =
s"$schema.$tableName"

def sshTunnel: Option[StorageTarget.TunnelConfig]

def blacklistTabular: Option[List[SchemaCriterion]] // None means tabular is disabled
}

object StorageTarget {

case class ParseError(message: String) extends AnyVal

sealed trait SslMode extends StringEnum { def asProperty = asString.toLowerCase.replace('_', '-') }
case object Disable extends SslMode { def asString = "DISABLE" }
case object Require extends SslMode { def asString = "REQUIRE" }
case object VerifyCa extends SslMode { def asString = "VERIFY_CA" }
case object VerifyFull extends SslMode { def asString = "VERIFY_FULL" }

implicit val sslModeDecoder: Decoder[SslMode] =
decodeStringEnum[SslMode]
StringEnum.decodeStringEnum[SslMode] // TODO: tests fail if it is in object

object SslMode {
case object Disable extends SslMode { def asString = "DISABLE" }
case object Require extends SslMode { def asString = "REQUIRE" }
case object VerifyCa extends SslMode { def asString = "VERIFY_CA" }
case object VerifyFull extends SslMode { def asString = "VERIFY_FULL" }
}

/**
* Configuration to access Snowplow Processing Manifest
Expand All @@ -77,6 +75,9 @@ object StorageTarget {

object ProcessingManifestConfig {
case class AmazonDynamoDbConfig(tableName: String)

implicit val amazonDynamoDbConfigDecoder: Decoder[AmazonDynamoDbConfig] =
deriveDecoder[AmazonDynamoDbConfig]
}

/**
Expand All @@ -93,7 +94,8 @@ object StorageTarget {
username: String,
password: PasswordConfig,
sshTunnel: Option[TunnelConfig],
processingManifest: Option[ProcessingManifestConfig])
processingManifest: Option[ProcessingManifestConfig],
blacklistTabular: Option[List[SchemaCriterion]])
extends StorageTarget

/**
Expand All @@ -113,7 +115,8 @@ object StorageTarget {
maxError: Int,
compRows: Long,
sshTunnel: Option[TunnelConfig],
processingManifest: Option[ProcessingManifestConfig])
processingManifest: Option[ProcessingManifestConfig],
blacklistTabular: Option[List[SchemaCriterion]])
extends StorageTarget

/**
Expand All @@ -133,7 +136,7 @@ object StorageTarget {
tcpKeepAlive: Option[Boolean],
tcpKeepAliveMinutes: Option[Int]) {
/** Either errors or list of mutators to update the `Properties` object */
val validation: Either[LoaderError, List[Properties => Unit]] = jdbcEncoder.encodeObject(this).toList.map {
val validation: Either[ParseError, List[Properties => Unit]] = jdbcEncoder.encodeObject(this).toList.map {
case (property, value) => value.fold(
((_: Properties) => ()).asRight,
b => ((props: Properties) => { props.setProperty(property, b.toString); () }).asRight,
Expand All @@ -150,10 +153,10 @@ object StorageTarget {
_ => s"Impossible to apply JDBC property [$property] with JSON object".asLeft
)
} traverse(_.toValidatedNel) match {
case Validated.Valid(updaters) => updaters.asRight[LoaderError]
case Validated.Valid(updaters) => updaters.asRight[ParseError]
case Validated.Invalid(errors) =>
val messages = "Invalid JDBC options: " ++ errors.toList.mkString(", ")
val error: LoaderError = LoaderError.ConfigError(messages)
val error: ParseError = ParseError(messages)
error.asLeft[List[Properties => Unit]]
}
}
Expand All @@ -175,7 +178,6 @@ object StorageTarget {
j.filterLevel, j.loginTimeout, j.loglevel, j.socketTimeout, j.ssl, j.sslMode,
j.sslRootCert, j.tcpKeepAlive, j.tcpKeepAliveMinutes))


/** Reference to encrypted entity inside EC2 Parameter Store */
case class ParameterStoreConfig(parameterName: String)

Expand Down Expand Up @@ -226,11 +228,11 @@ object StorageTarget {
* @param validJson JSON that is presumably self-describing storage target configuration
* @return validated entity of `StorageTarget` ADT if success
*/
def decodeStorageTarget(validJson: SelfDescribingData[Json]): Either[ConfigError, StorageTarget] =
def decodeStorageTarget(validJson: SelfDescribingData[Json]): Either[ParseError, StorageTarget] =
(validJson.schema.name, validJson.data) match {
case ("redshift_config", data) => data.as[RedshiftConfig].leftMap(e => ConfigError(e.show))
case ("postgresql_config", data) => data.as[PostgresqlConfig].leftMap(e => ConfigError(e.show))
case (name, _) => ConfigError(s"Unsupported storage target [$name]").asLeft
case ("redshift_config", data) => data.as[RedshiftConfig].leftMap(e => ParseError(e.show))
case ("postgresql_config", data) => data.as[PostgresqlConfig].leftMap(e => ParseError(e.show))
case (name, _) => ParseError(s"Unsupported storage target [$name]").asLeft
}

/**
Expand All @@ -241,21 +243,53 @@ object StorageTarget {
* @return valid `StorageTarget` OR
* non-empty list of errors (such as validation or parse errors)
*/
def parseTarget(client: Client[Id, Json], target: String): Either[ConfigError, StorageTarget] =
def parseTarget(client: Client[Id, Json], target: String): Either[ParseError, StorageTarget] =
parse(target)
.leftMap(e => ConfigError(e.show))
.flatMap(json => SelfDescribingData.parse(json).leftMap(e => ConfigError(s"Not a self-describing JSON, ${e.code}")))
.leftMap(e => ParseError(e.show))
.flatMap(json => SelfDescribingData.parse(json).leftMap(e => ParseError(s"Not a self-describing JSON, ${e.code}")))
.flatMap(payload => validate(client)(payload))
.flatMap(decodeStorageTarget)

implicit val postgresqlConfigDecoder: Decoder[PostgresqlConfig] =
deriveDecoder[PostgresqlConfig]

implicit val redsfhitConfigDecoder: Decoder[RedshiftConfig] =
deriveDecoder[RedshiftConfig]

implicit val encryptedConfigDecoder: Decoder[EncryptedConfig] =
deriveDecoder[EncryptedConfig]

implicit val tunnerConfigDecoder: Decoder[TunnelConfig] =
deriveDecoder[TunnelConfig]

implicit val bastionConfigDecoder: Decoder[BastionConfig] =
deriveDecoder[BastionConfig]

implicit val destinationConfigDecoder: Decoder[DestinationConfig] =
deriveDecoder[DestinationConfig]

implicit val parameterStoreConfigDecoder: Decoder[ParameterStoreConfig] =
deriveDecoder[ParameterStoreConfig]

implicit val passwordConfigDecoder: Decoder[PasswordConfig] =
deriveDecoder[PasswordConfig]

implicit val processingManifestConfigDecoder: Decoder[ProcessingManifestConfig] =
deriveDecoder[ProcessingManifestConfig]

implicit val schemaCriterionConfigDecoder: Decoder[SchemaCriterion] =
Decoder.decodeString.emap {
s => SchemaCriterion.parse(s).toRight(s"Cannot parse [$s] as Iglu SchemaCriterion, it must have iglu:vendor/name/format/1-*-* format")
}

/**
* Validate json4s JValue AST with Iglu Resolver and immediately convert it into circe AST
*
* @param client Iglu resolver and validator
* @param json json4s AST
* @return circe AST
*/
private def validate(client: Client[Id, Json])(json: SelfDescribingData[Json]): Either[ConfigError, SelfDescribingData[Json]] = {
client.check(json).value.leftMap(e => ConfigError(e.show)).as(json)
private def validate(client: Client[Id, Json])(json: SelfDescribingData[Json]): Either[ParseError, SelfDescribingData[Json]] = {
client.check(json).value.leftMap(e => ParseError(e.show)).leftMap(error => ParseError(s"${json.schema.toSchemaUri} ${error.message}")).as(json)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.snowplowanalytics.snowplow.rdbloader.common

import scala.reflect.runtime.universe._
import scala.reflect.runtime.{universe => ru}

import cats.implicits._

import io.circe._

/**
* Common trait for all ADTs that can be read from string
* Must be extended by sealed hierarchy including only singletons
* Used by `decodeStringEnum` to get runtime representation of whole ADT
*/
trait StringEnum {
/**
* **IN** string representation.
* It should be used only to help read `StringEnum` from string
* and never other way round, such as render value into SQL statement
*/
def asString: String
}

object StringEnum {
/**
* Derive decoder for ADT with `StringEnum`
*
* @tparam A sealed hierarchy
* @return circe decoder for ADT `A`
*/
def decodeStringEnum[A <: StringEnum: TypeTag]: Decoder[A] =
Decoder.instance(parseEnum[A])

/**
* Parse element of `StringEnum` sealed hierarchy from circe AST
*
* @param hCursor parser's cursor
* @tparam A sealed hierarchy
* @return either successful circe AST or decoding failure
*/
private def parseEnum[A <: StringEnum: TypeTag](hCursor: HCursor): Decoder.Result[A] = {
for {
string <- hCursor.as[String]
method = fromString[A](string)
result <- method.asDecodeResult(hCursor)
} yield result
}

/**
* Parse element of `StringEnum` sealed hierarchy from String
*
* @param string line containing `asString` representation of `StringEnum`
* @tparam A sealed hierarchy
* @return either successful circe AST or decoding failure
*/
def fromString[A <: StringEnum: TypeTag](string: String): Either[String, A] = {
val map = sealedDescendants[A].map { o => (o.asString, o) }.toMap
map.get(string) match {
case Some(a) => Right(a)
case None => Left(s"Unknown ${typeOf[A].typeSymbol.name.toString} [$string]")
}
}

/**
* Get all objects extending some sealed hierarchy
* @tparam Root some sealed trait with object descendants
* @return whole set of objects
*/
def sealedDescendants[Root: TypeTag]: Set[Root] = {
val symbol = typeOf[Root].typeSymbol
val internal = symbol.asInstanceOf[scala.reflect.internal.Symbols#Symbol]
val descendants = if (internal.isSealed)
Some(internal.sealedDescendants.map(_.asInstanceOf[Symbol]) - symbol)
else None
descendants.getOrElse(Set.empty).map(x => getCaseObject(x).asInstanceOf[Root])
}

private val m = ru.runtimeMirror(getClass.getClassLoader)

/**
* Reflection method to get runtime object by compiler's `Symbol`
* @param desc compiler runtime `Symbol`
* @return "real" scala case object
*/
private def getCaseObject(desc: Symbol): Any = {
val mod = m.staticModule(desc.asClass.fullName)
m.reflectModule(mod).instance
}
}
Loading

0 comments on commit 780b999

Please sign in to comment.