From 95a8db82bbcefafd131b04bc28af6b6d4ba78a46 Mon Sep 17 00:00:00 2001 From: Anton Parkhomenko Date: Wed, 28 Oct 2020 23:52:22 +0300 Subject: [PATCH] Common: bump schema-ddl to 0.12.0 (close #192) --- .../rdbloader/common/StorageTarget.scala | 32 +++++++++---------- .../spark/EventUtils.scala | 2 +- project/Dependencies.scala | 2 +- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/StorageTarget.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/StorageTarget.scala index 3ad476d0a..8aa9f416b 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/StorageTarget.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/StorageTarget.scala @@ -20,7 +20,7 @@ import cats.data._ import cats.implicits._ import com.snowplowanalytics.iglu.client.Client -import com.snowplowanalytics.iglu.core.circe.instances._ +import com.snowplowanalytics.iglu.core.circe.implicits.{ schemaCriterionDecoder => _, _ } import com.snowplowanalytics.iglu.core.{SelfDescribingData, SchemaCriterion} import io.circe._ @@ -104,7 +104,7 @@ object StorageTarget { tcpKeepAlive: Option[Boolean], tcpKeepAliveMinutes: Option[Int]) { /** Either errors or list of mutators to update the `Properties` object */ - val validation: Either[ParseError, List[Properties => Unit]] = jdbcEncoder.encodeObject(this).toList.map { + val validation: Either[ParseError, List[Properties => Unit]] = RedshiftJdbc.jdbcEncoder.encodeObject(this).toList.map { case (property, value) => value.fold( ((_: Properties) => ()).asRight, b => ((props: Properties) => { props.setProperty(property, b.toString); () }).asRight, @@ -131,20 +131,20 @@ object StorageTarget { object RedshiftJdbc { val empty = RedshiftJdbc(None, None, None, None, None, None, None, None, None, None, None, None) - } - - implicit val jdbcDecoder: Decoder[RedshiftJdbc] = - Decoder.forProduct12("BlockingRowsMode", "DisableIsValidQuery", "DSILogLevel", - "FilterLevel", "loginTimeout", "loglevel", "socketTimeout", "ssl", "sslMode", - "sslRootCert", "tcpKeepAlive", "TCPKeepAliveMinutes")(RedshiftJdbc.apply) - implicit val jdbcEncoder: ObjectEncoder[RedshiftJdbc] = - Encoder.forProduct12("BlockingRowsMode", "DisableIsValidQuery", "DSILogLevel", - "FilterLevel", "loginTimeout", "loglevel", "socketTimeout", "ssl", "sslMode", - "sslRootCert", "tcpKeepAlive", "TCPKeepAliveMinutes")((j: RedshiftJdbc) => - (j.blockingRows, j.disableIsValidQuery, j.dsiLogLevel, - j.filterLevel, j.loginTimeout, j.loglevel, j.socketTimeout, j.ssl, j.sslMode, - j.sslRootCert, j.tcpKeepAlive, j.tcpKeepAliveMinutes)) + implicit val jdbcDecoder: Decoder[RedshiftJdbc] = + Decoder.forProduct12("BlockingRowsMode", "DisableIsValidQuery", "DSILogLevel", + "FilterLevel", "loginTimeout", "loglevel", "socketTimeout", "ssl", "sslMode", + "sslRootCert", "tcpKeepAlive", "TCPKeepAliveMinutes")(RedshiftJdbc.apply) + + implicit val jdbcEncoder: Encoder.AsObject[RedshiftJdbc] = + Encoder.forProduct12("BlockingRowsMode", "DisableIsValidQuery", "DSILogLevel", + "FilterLevel", "loginTimeout", "loglevel", "socketTimeout", "ssl", "sslMode", + "sslRootCert", "tcpKeepAlive", "TCPKeepAliveMinutes")((j: RedshiftJdbc) => + (j.blockingRows, j.disableIsValidQuery, j.dsiLogLevel, + j.filterLevel, j.loginTimeout, j.loglevel, j.socketTimeout, j.ssl, j.sslMode, + j.sslRootCert, j.tcpKeepAlive, j.tcpKeepAliveMinutes)) + } /** Reference to encrypted entity inside EC2 Parameter Store */ case class ParameterStoreConfig(parameterName: String) @@ -238,7 +238,7 @@ object StorageTarget { implicit val passwordConfigDecoder: Decoder[PasswordConfig] = deriveDecoder[PasswordConfig] - implicit val schemaCriterionConfigDecoder: Decoder[SchemaCriterion] = + implicit def schemaCriterionConfigDecoder: Decoder[SchemaCriterion] = Decoder.decodeString.emap { s => SchemaCriterion.parse(s).toRight(s"Cannot parse [$s] as Iglu SchemaCriterion, it must have iglu:vendor/name/format/1-*-* format") } diff --git a/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/EventUtils.scala b/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/EventUtils.scala index 2754792d7..e6ba31423 100644 --- a/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/EventUtils.scala +++ b/modules/shredder/src/main/scala/com.snowplowanalytics.snowplow.storage/spark/EventUtils.scala @@ -90,7 +90,7 @@ object EventUtils { * @return list of columns or flattening error */ def flatten[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F], instance: SelfDescribingData[Json]): EitherT[F, FailureDetails.LoaderIgluError, List[String]] = - getOrdered(resolver, instance.schema).map { ordered => FlatData.flatten(instance.data, ordered, Some(escape)) } + getOrdered(resolver, instance.schema).map { ordered => FlatData.flatten(instance.data, ordered, FlatData.getString(Some(escape)), "") } /** Prevents data with newlines and tabs from breaking the loading process */ private def escape(s: String): String = diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 76c716f45..a248a7c9e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -31,7 +31,7 @@ object Dependencies { // Scala (Shredder) val spark = "3.0.1" val eventsManifest = "0.2.0" - val schemaDdl = "0.10.0" + val schemaDdl = "0.12.0" // Java (Loader) val slf4j = "1.7.30"