Skip to content

Commit

Permalink
Loader: switch from Free monad to cats-effect IO (close #184)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Oct 23, 2020
1 parent b5d541b commit a19c643
Show file tree
Hide file tree
Showing 46 changed files with 1,643 additions and 2,359 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ lazy val loader = project.in(file("."))
.settings(BuildSettings.assemblySettings)
.settings(resolvers ++= Dependencies.resolutionRepos)
.settings(
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1"),
libraryDependencies ++= Seq(
Dependencies.decline,
Dependencies.scalaTracker,
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
logLevel := Level.Warn

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")
addSbtPlugin("com.localytics" % "sbt-dynamodb" % "2.0.3")
addSbtPlugin("com.localytics" % "sbt-dynamodb" % "2.0.3")
171 changes: 0 additions & 171 deletions src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderA.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@
package com.snowplowanalytics.snowplow.rdbloader

import cats.Show
import cats.implicits._
import cats.data.ValidatedNel

import com.snowplowanalytics.snowplow.rdbloader.discovery.DiscoveryFailure

/** Root error type */
sealed trait LoaderError
sealed trait LoaderError extends Product with Serializable

object LoaderError {

Expand Down Expand Up @@ -63,7 +62,9 @@ object LoaderError {
validated.leftMap(errors => DiscoveryError(errors.toList): LoaderError).toEither

/** Other errors */
case class LoaderLocalError(message: String) extends LoaderError
case class LoaderLocalError(message: String) extends Throwable with LoaderError {
override def getMessage: String = message
}

/** Error happened during DDL-statements execution. Critical */
case class MigrationError(message: String) extends LoaderError
Expand Down
36 changes: 0 additions & 36 deletions src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Log.scala

This file was deleted.

98 changes: 48 additions & 50 deletions src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,71 +12,69 @@
*/
package com.snowplowanalytics.snowplow.rdbloader

import cats.syntax.flatMap._
import cats.Monad
import cats.data.Validated._
import cats.implicits._
import cats.effect.{ExitCode, IO, IOApp }

// This project
import interpreters.Interpreter
import config.CliConfig
import loaders.Common.{ load, discover }
import com.snowplowanalytics.snowplow.rdbloader.dsl.{AWS, JDBC, Logging, RealWorld}
import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig
import com.snowplowanalytics.snowplow.rdbloader.loaders.Common.{discover, load}
import com.snowplowanalytics.snowplow.rdbloader.utils.{S3, SSH}

/**
* Application entry point
*/
object Main {
object Main extends IOApp {
/**
* If arguments or config is invalid exit with 1
* and print errors to EMR stdout
* If arguments and config are valid, but loading failed
* print message to `track` bucket
*/
def main(argv: Array[String]): Unit = {
def run(argv: List[String]): IO[ExitCode] =
CliConfig.parse(argv) match {
case Valid(config) =>
val status = run(config)
sys.exit(status)
case Invalid(errors) =>
println("Configuration error")
errors.toList.foreach(error => println(error.message))
sys.exit(1)
}
}
RealWorld.initialize[IO](config).flatMap { dsls =>
import dsls._

/**
* Initialize interpreter from parsed configuration and
* run all IO actions through it. Should never throw exceptions
*
* @param config parsed configuration
* @return exit code status. 0 for success, 1 if anything went wrong
*/
def run(config: CliConfig): Int = {
val interpreter = Interpreter.initialize(config)
val result = for {
discovery <- discover[IO](config)
jdbc = SSH.resource[IO](config.target.sshTunnel) *>
JDBC.interpreter[IO](config.target, config.dryRun)
_ <- LoaderAction(jdbc.use { implicit conn => load[IO](config, discovery).value })
} yield ()

val actions: Action[Int] = for {
data <- discover(config).flatTap(db.Migration.perform(config.target.schema)).value
result <- data match {
case Right(discovery) => load(config, discovery).value
case Left(LoaderError.StorageTargetError(message)) =>
val upadtedMessage = s"$message\n${interpreter.getLastCopyStatements}"
ActionE.liftError(LoaderError.StorageTargetError(upadtedMessage))
case Left(error) => ActionE.liftError(error)
}
message = utils.Common.interpret(config, result)
_ <- LoaderA.track(message)
status <- close(config.logKey, message)
} yield status

actions.foldMap(interpreter.run)
}
result
.value
.attempt
.map { // TODO: write shorter; and figure out if unit test is possible
case Left(e) =>
e.printStackTrace(System.out)
(LoaderError.LoaderLocalError(e.getMessage): LoaderError).asLeft
case Right(e) => e
}
.flatMap(res => close[IO](config.logKey, res))
}
case Invalid(errors) =>
IO.delay(println("Configuration error")) *>
errors.traverse_(message => IO.delay(println(message))).as(ExitCode.Error)
}

/** Get exit status based on all previous steps */
private def close(logKey: Option[S3.Key], message: Log) = {
logKey match {
case Some(key) => for {
dumpResult <- LoaderA.dump(key)
status <- LoaderA.exit(message, Some(dumpResult))
} yield status
case None => LoaderA.exit(message, None)
private def close[F[_]: Monad: Logging: AWS](logKey: Option[S3.Key], result: Either[LoaderError, Unit]): F[ExitCode] = {
val dumping = logKey.traverse(Logging[F].dump).flatMap { dumpResult =>
(result, dumpResult) match {
case (Right(_), None) =>
Logging[F].print(s"INFO: Logs were not dumped to S3").as(ExitCode.Success)
case (Left(_), None) =>
Logging[F].print(s"INFO: Logs were not dumped to S3").as(ExitCode.Error)
case (Right(_), Some(Right(key))) =>
Logging[F].print(s"INFO: Logs successfully dumped to S3 [$key]").as(ExitCode.Success)
case (Left(_), Some(Right(key))) =>
Logging[F].print(s"INFO: Logs successfully dumped to S3 [$key]").as(ExitCode.Error)
case (_, Some(Left(error))) =>
Logging[F].print(s"ERROR: Log-dumping failed: [$error]").as(ExitCode.Error)
}
}

Logging[F].track(result) *> dumping
}
}
Loading

0 comments on commit a19c643

Please sign in to comment.