Skip to content

Commit

Permalink
Common: allow tabular blacklisting (close #156)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Jun 25, 2019
1 parent 28d0a26 commit e209b4b
Show file tree
Hide file tree
Showing 30 changed files with 440 additions and 261 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ lazy val common = project.in(file("common"))
Dependencies.circeGeneric,
Dependencies.circeGenericExtra,
Dependencies.circeLiteral,
Dependencies.schemaDdl
Dependencies.schemaDdl,
Dependencies.specs2
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.common

import io.circe.Json
import io.circe.{ Json, Encoder }
import io.circe.syntax._

import cats.Monad
import cats.data.EitherT
Expand All @@ -24,7 +25,7 @@ import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.ClientError.ResolutionError
import com.snowplowanalytics.iglu.client.ClientError

import com.snowplowanalytics.iglu.schemaddl.IgluSchema
import com.snowplowanalytics.iglu.schemaddl.migrations.Migration.OrderedSchemas
Expand All @@ -38,10 +39,18 @@ object Flattening {
* Particular schema could not be fetched, thus whole flattening algorithm cannot be built
*/
sealed trait FlatteningError

object FlatteningError {
case class SchemaListResolution(error: ResolutionError) extends FlatteningError
case class SchemaResolution(error: ResolutionError) extends FlatteningError
case class Parsing(error: String) extends FlatteningError
final case class SchemaListResolution(error: ClientError.ResolutionError) extends FlatteningError
final case class SchemaResolution(error: ClientError.ResolutionError) extends FlatteningError
final case class Parsing(error: String) extends FlatteningError

implicit val flatteningErrorCirceEncoder: Encoder[FlatteningError] =
Encoder.instance {
case SchemaListResolution(error: ClientError) => (error: ClientError).asJson
case SchemaResolution(error) => (error: ClientError).asJson
case Parsing(error) => error.asJson
}
}

// Cache = Map[SchemaKey, OrderedSchemas]
Expand All @@ -51,7 +60,7 @@ object Flattening {

def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], vendor: String, name: String, model: Int): EitherT[F, FlatteningError, OrderedSchemas] =
for {
schemaList <- EitherT[F, ResolutionError, SchemaList](resolver.listSchemas(vendor, name, Some(model))).leftMap(FlatteningError.SchemaListResolution)
schemaList <- EitherT[F, ClientError.ResolutionError, SchemaList](resolver.listSchemas(vendor, name, Some(model))).leftMap(FlatteningError.SchemaListResolution)
ordered <- OrderedSchemas.fromSchemaList(schemaList, fetch(resolver))
} yield ordered

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,22 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader
package config
package com.snowplowanalytics.snowplow.rdbloader.common

import java.util.Properties

import cats.Id
import cats.data._
import cats.implicits._

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.core.circe.instances._
import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingData}

import io.circe._
import io.circe.parser.parse
import io.circe.Decoder._
import io.circe.generic.auto._

import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.instances._
import com.snowplowanalytics.iglu.client.Client

// This project
import LoaderError._
import utils.Common._

import io.circe.parser.parse

/**
* Common configuration for JDBC target, such as Redshift and Postgres
Expand All @@ -49,25 +43,29 @@ sealed trait StorageTarget extends Product with Serializable {

def processingManifest: Option[StorageTarget.ProcessingManifestConfig]

def eventsTable: String =
loaders.Common.getEventsTable(schema)

def shreddedTable(tableName: String): String =
s"$schema.$tableName"

def sshTunnel: Option[StorageTarget.TunnelConfig]

def blacklistTabular: Option[List[SchemaKey]] // None means tabular is disabled?
}

object StorageTarget {

case class ParseError(message: String) extends AnyVal

sealed trait SslMode extends StringEnum { def asProperty = asString.toLowerCase.replace('_', '-') }
case object Disable extends SslMode { def asString = "DISABLE" }
case object Require extends SslMode { def asString = "REQUIRE" }
case object VerifyCa extends SslMode { def asString = "VERIFY_CA" }
case object VerifyFull extends SslMode { def asString = "VERIFY_FULL" }

implicit val sslModeDecoder: Decoder[SslMode] =
decodeStringEnum[SslMode]
StringEnum.decodeStringEnum[SslMode] // TODO: tests fail if it is in object

object SslMode {
case object Disable extends SslMode { def asString = "DISABLE" }
case object Require extends SslMode { def asString = "REQUIRE" }
case object VerifyCa extends SslMode { def asString = "VERIFY_CA" }
case object VerifyFull extends SslMode { def asString = "VERIFY_FULL" }
}

/**
* Configuration to access Snowplow Processing Manifest
Expand All @@ -93,7 +91,8 @@ object StorageTarget {
username: String,
password: PasswordConfig,
sshTunnel: Option[TunnelConfig],
processingManifest: Option[ProcessingManifestConfig])
processingManifest: Option[ProcessingManifestConfig],
blacklistTabular: Option[List[SchemaKey]])
extends StorageTarget

/**
Expand All @@ -113,7 +112,8 @@ object StorageTarget {
maxError: Int,
compRows: Long,
sshTunnel: Option[TunnelConfig],
processingManifest: Option[ProcessingManifestConfig])
processingManifest: Option[ProcessingManifestConfig],
blacklistTabular: Option[List[SchemaKey]])
extends StorageTarget

/**
Expand All @@ -133,7 +133,7 @@ object StorageTarget {
tcpKeepAlive: Option[Boolean],
tcpKeepAliveMinutes: Option[Int]) {
/** Either errors or list of mutators to update the `Properties` object */
val validation: Either[LoaderError, List[Properties => Unit]] = jdbcEncoder.encodeObject(this).toList.map {
val validation: Either[ParseError, List[Properties => Unit]] = jdbcEncoder.encodeObject(this).toList.map {
case (property, value) => value.fold(
((_: Properties) => ()).asRight,
b => ((props: Properties) => { props.setProperty(property, b.toString); () }).asRight,
Expand All @@ -150,10 +150,10 @@ object StorageTarget {
_ => s"Impossible to apply JDBC property [$property] with JSON object".asLeft
)
} traverse(_.toValidatedNel) match {
case Validated.Valid(updaters) => updaters.asRight[LoaderError]
case Validated.Valid(updaters) => updaters.asRight[ParseError]
case Validated.Invalid(errors) =>
val messages = "Invalid JDBC options: " ++ errors.toList.mkString(", ")
val error: LoaderError = LoaderError.ConfigError(messages)
val error: ParseError = ParseError(messages)
error.asLeft[List[Properties => Unit]]
}
}
Expand Down Expand Up @@ -226,11 +226,11 @@ object StorageTarget {
* @param validJson JSON that is presumably self-describing storage target configuration
* @return validated entity of `StorageTarget` ADT if success
*/
def decodeStorageTarget(validJson: SelfDescribingData[Json]): Either[ConfigError, StorageTarget] =
def decodeStorageTarget(validJson: SelfDescribingData[Json]): Either[ParseError, StorageTarget] =
(validJson.schema.name, validJson.data) match {
case ("redshift_config", data) => data.as[RedshiftConfig].leftMap(e => ConfigError(e.show))
case ("postgresql_config", data) => data.as[PostgresqlConfig].leftMap(e => ConfigError(e.show))
case (name, _) => ConfigError(s"Unsupported storage target [$name]").asLeft
case ("redshift_config", data) => data.as[RedshiftConfig].leftMap(e => ParseError(e.show))
case ("postgresql_config", data) => data.as[PostgresqlConfig].leftMap(e => ParseError(e.show))
case (name, _) => ParseError(s"Unsupported storage target [$name]").asLeft
}

/**
Expand All @@ -241,10 +241,10 @@ object StorageTarget {
* @return valid `StorageTarget` OR
* non-empty list of errors (such as validation or parse errors)
*/
def parseTarget(client: Client[Id, Json], target: String): Either[ConfigError, StorageTarget] =
def parseTarget(client: Client[Id, Json], target: String): Either[ParseError, StorageTarget] =
parse(target)
.leftMap(e => ConfigError(e.show))
.flatMap(json => SelfDescribingData.parse(json).leftMap(e => ConfigError(s"Not a self-describing JSON, ${e.code}")))
.leftMap(e => ParseError(e.show))
.flatMap(json => SelfDescribingData.parse(json).leftMap(e => ParseError(s"Not a self-describing JSON, ${e.code}")))
.flatMap(payload => validate(client)(payload))
.flatMap(decodeStorageTarget)

Expand All @@ -255,7 +255,7 @@ object StorageTarget {
* @param json json4s AST
* @return circe AST
*/
private def validate(client: Client[Id, Json])(json: SelfDescribingData[Json]): Either[ConfigError, SelfDescribingData[Json]] = {
client.check(json).value.leftMap(e => ConfigError(e.show)).as(json)
private def validate(client: Client[Id, Json])(json: SelfDescribingData[Json]): Either[ParseError, SelfDescribingData[Json]] = {
client.check(json).value.leftMap(e => ParseError(e.show)).leftMap(error => ParseError(s"${json.schema.toSchemaUri} ${error.message}")).as(json)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.snowplowanalytics.snowplow.rdbloader.common

import scala.reflect.runtime.universe._
import scala.reflect.runtime.{universe => ru}

import cats.implicits._

import io.circe._

/**
* Common trait for all ADTs that can be read from string
* Must be extended by sealed hierarchy including only singletons
* Used by `decodeStringEnum` to get runtime representation of whole ADT
*/
trait StringEnum {
/**
* **IN** string representation.
* It should be used only to help read `StringEnum` from string
* and never other way round, such as render value into SQL statement
*/
def asString: String
}

object StringEnum {
/**
* Derive decoder for ADT with `StringEnum`
*
* @tparam A sealed hierarchy
* @return circe decoder for ADT `A`
*/
def decodeStringEnum[A <: StringEnum: TypeTag]: Decoder[A] =
Decoder.instance(parseEnum[A])

/**
* Parse element of `StringEnum` sealed hierarchy from circe AST
*
* @param hCursor parser's cursor
* @tparam A sealed hierarchy
* @return either successful circe AST or decoding failure
*/
private def parseEnum[A <: StringEnum: TypeTag](hCursor: HCursor): Decoder.Result[A] = {
for {
string <- hCursor.as[String]
method = fromString[A](string)
result <- method.asDecodeResult(hCursor)
} yield result
}

/**
* Parse element of `StringEnum` sealed hierarchy from String
*
* @param string line containing `asString` representation of `StringEnum`
* @tparam A sealed hierarchy
* @return either successful circe AST or decoding failure
*/
def fromString[A <: StringEnum: TypeTag](string: String): Either[String, A] = {
val map = sealedDescendants[A].map { o => (o.asString, o) }.toMap
map.get(string) match {
case Some(a) => Right(a)
case None => Left(s"Unknown ${typeOf[A].typeSymbol.name.toString} [$string]")
}
}

/**
* Get all objects extending some sealed hierarchy
* @tparam Root some sealed trait with object descendants
* @return whole set of objects
*/
def sealedDescendants[Root: TypeTag]: Set[Root] = {
val symbol = typeOf[Root].typeSymbol
val internal = symbol.asInstanceOf[scala.reflect.internal.Symbols#Symbol]
val descendants = if (internal.isSealed)
Some(internal.sealedDescendants.map(_.asInstanceOf[Symbol]) - symbol)
else None
descendants.getOrElse(Set.empty).map(x => getCaseObject(x).asInstanceOf[Root])
}

private val m = ru.runtimeMirror(getClass.getClassLoader)

/**
* Reflection method to get runtime object by compiler's `Symbol`
* @param desc compiler runtime `Symbol`
* @return "real" scala case object
*/
private def getCaseObject(desc: Symbol): Any = {
val mod = m.staticModule(desc.asClass.fullName)
m.reflectModule(mod).instance
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2012-2019 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader

import io.circe._
import cats.Id
import cats.effect.Clock

import scala.concurrent.duration.{ TimeUnit, MILLISECONDS, NANOSECONDS }

package object common {

implicit val catsClockIdInstance: Clock[Id] = new Clock[Id] {
override def realTime(unit: TimeUnit): Id[Long] =
unit.convert(System.nanoTime(), NANOSECONDS)

override def monotonic(unit: TimeUnit): Id[Long] =
unit.convert(System.currentTimeMillis(), MILLISECONDS)
}

/**
* Syntax extension to transform `Either` with string as failure
* into circe-appropriate decoder result
*/
implicit class ParseErrorOps[A](val error: Either[String, A]) extends AnyVal {
def asDecodeResult(hCursor: HCursor): Decoder.Result[A] = error match {
case Right(success) => Right(success)
case Left(message) => Left(DecodingFailure(message, hCursor.history))
}
}
}
Loading

0 comments on commit e209b4b

Please sign in to comment.