Skip to content

Commit

Permalink
RDB Loader: add --dry-run option (close #31)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Sep 6, 2017
1 parent 6778f14 commit 21fbf33
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ import utils.{ Common, Compat }
* @param steps collected steps
* @param logKey file on S3 to dump logs
* @param folder specific run-folder to load (skipping discovery)
* @param dryRun if RDB Loader should just discover data and print SQL
*/
case class CliConfig(
configYaml: SnowplowConfig,
target: StorageTarget,
steps: Set[Step],
logKey: S3.Key,
folder: Option[S3.Folder])
folder: Option[S3.Folder],
dryRun: Boolean)

object CliConfig {

Expand Down Expand Up @@ -72,6 +74,9 @@ object CliConfig {
opt[String]("folder").valueName("<s3-folder>").
action((x, c) => c.copy(folder = Some(x)))

opt[Unit]("dry-run").
action((_, c) => c.copy(dryRun = true))

help("help").text("prints this usage text")

}
Expand Down Expand Up @@ -103,6 +108,7 @@ object CliConfig {
* @param include sequence of of decoded steps to include
* @param skip sequence of of decoded steps to skip
* @param logkey filename, where RDB log dump will be saved
* @param dryRun if RDB Loader should just discover data and print SQL
*/
private[config] case class RawConfig(
config: String,
Expand All @@ -111,28 +117,29 @@ object CliConfig {
include: Seq[Step.IncludeStep],
skip: Seq[Step.SkipStep],
logkey: String,
folder: Option[String])
folder: Option[String],
dryRun: Boolean)

// Always invalid initial parsing configuration
private[this] val rawCliConfig = RawConfig("", "", "", Nil, Nil, "", None)
private[this] val rawCliConfig = RawConfig("", "", "", Nil, Nil, "", None, false)

/**
* Validated and transform initial raw cli arguments into
* ready-to-use `CliConfig`, aggregating errors if any
*
* @param cliConfig initial raw arguments
* @param rawConfig initial raw arguments
* @return application config in case of success or
* non empty list of config errors in case of failure
*/
private[config] def transform(cliConfig: RawConfig): ValidatedNel[ConfigError, CliConfig] = {
val config = base64decode(cliConfig.config).flatMap(SnowplowConfig.parse).toValidatedNel
val logkey = S3.Key.parse(cliConfig.logkey).leftMap(DecodingError).toValidatedNel
val target = loadTarget(cliConfig.resolver, cliConfig.target)
val steps = Step.constructSteps(cliConfig.skip.toSet, cliConfig.include.toSet)
val folder = cliConfig.folder.map(f => S3.Folder.parse(f).leftMap(DecodingError).toValidatedNel).sequence
private[config] def transform(rawConfig: RawConfig): ValidatedNel[ConfigError, CliConfig] = {
val config = base64decode(rawConfig.config).flatMap(SnowplowConfig.parse).toValidatedNel
val logkey = S3.Key.parse(rawConfig.logkey).leftMap(DecodingError).toValidatedNel
val target = loadTarget(rawConfig.resolver, rawConfig.target)
val steps = Step.constructSteps(rawConfig.skip.toSet, rawConfig.include.toSet)
val folder = rawConfig.folder.map(f => S3.Folder.parse(f).leftMap(DecodingError).toValidatedNel).sequence

(target |@| config |@| logkey |@| folder).map {
case (t, c, l, f) => CliConfig(c, t, steps, l, f)
case (t, c, l, f) => CliConfig(c, t, steps, l, f, rawConfig.dryRun)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (c) 2012-2017 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader
package interpreters

import java.nio.file._

import cats._
import cats.implicits._
import com.amazonaws.services.s3.AmazonS3

import scala.collection.mutable.ListBuffer

import com.snowplowanalytics.snowplow.scalatracker.Tracker

// This project
import config.CliConfig
import LoaderA._
import loaders.Common.SqlString
import utils.Common
import com.snowplowanalytics.snowplow.rdbloader.{ Log => ExitLog }

/**
* Interpreter performs all actual side-effecting work,
* interpreting `Action` at the end-of-the-world.
* It contains and handles configuration, connections and mutable state,
* all real-world interactions, except argument parsing
*/
class DryRunInterpreter private[interpreters](
cliConfig: CliConfig,
amazonS3: AmazonS3,
tracker: Option[Tracker]) extends Interpreter {

private val logQueries = ListBuffer.empty[SqlString]
private val logCopyFiles = ListBuffer.empty[Path]
private val logMessages = ListBuffer.empty[String]

def getDryRunLogs: String = {
val queries =
if (logQueries.nonEmpty) "Performed SQL Queries:\n" + logQueries.mkString("\n")
else "No SQL queries performed"
val messages =
if (logMessages.nonEmpty) "Debug messages:\n" + logMessages.mkString("\n")
else ""
val files =
if (logCopyFiles.nonEmpty) "Files loaded via stdin:\n" + logCopyFiles.mkString("\n")
else ""

List(queries, messages, files).mkString("\n")
}

def run: LoaderA ~> Id = new (LoaderA ~> Id) {

def apply[A](effect: LoaderA[A]): Id[A] = {
effect match {
case ListS3(folder) =>
S3Interpreter.list(amazonS3, folder).map(summaries => summaries.map(S3.getKey))
case KeyExists(key) =>
S3Interpreter.keyExists(amazonS3, key)
case DownloadData(source, dest) =>
logMessages.append(s"Downloading data from [$source] to [$dest]")
List.empty[Path].asRight[LoaderError]

case ExecuteQuery(query) =>
logQueries.append(query)
0.asRight[LoaderError]
case ExecuteTransaction(transactionalQueries) =>
val transaction = PgInterpreter.makeTransaction(transactionalQueries)
logQueries.appendAll(transaction).asRight
case ExecuteQueries(queries) =>
logQueries.appendAll(queries).asRight
0L.asRight[LoaderError]
case CopyViaStdin(files, _) =>
// Will never work while `DownloadData` is noop
logCopyFiles.appendAll(files)
0L.asRight[LoaderError]

case CreateTmpDir =>
logMessages.append("Created temporary directory")
Paths.get("tmp").asRight

case DeleteDir(path) =>
logMessages.append(s"Deleted temporary directory [${path.toString}]").asRight


case Sleep(timeout) =>
Thread.sleep(timeout)
case Track(result) =>
result match {
case ExitLog.LoadingSucceeded(_) =>
TrackerInterpreter.trackSuccess(tracker)
case ExitLog.LoadingFailed(message, _) =>
val sanitizedMessage = Common.sanitize(message, List(cliConfig.target.password, cliConfig.target.username))
TrackerInterpreter.trackError(tracker, sanitizedMessage)
}
case Dump(result) =>
val actionResult = result.toString + "\n"
val dryRunResult = "Dry-run action: \n" + getDryRunLogs
TrackerInterpreter.dumpStdout(amazonS3, cliConfig.logKey, actionResult + dryRunResult)
case Exit(loadResult, dumpResult) =>
println("Dry-run action: \n" + getDryRunLogs)
TrackerInterpreter.exit(loadResult, dumpResult)
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,132 +13,34 @@
package com.snowplowanalytics.snowplow.rdbloader
package interpreters

import java.io.IOException
import java.nio.file._
import java.nio.file.attribute.BasicFileAttributes
import java.sql.Connection

import cats._
import cats.implicits._

import com.amazonaws.services.s3.AmazonS3

import scala.util.control.NonFatal

import com.snowplowanalytics.snowplow.scalatracker.Tracker

// This project
import config.CliConfig
import LoaderA._
import LoaderError.LoaderLocalError
import utils.Common
import com.snowplowanalytics.snowplow.rdbloader.{ Log => ExitLog }

/**
* Interpreter performs all actual side-effecting work,
* interpreting `Action` at the end-of-the-world.
* It contains and handles configuration, connections and mutable state,
* all real-world interactions, except argument parsing
*/
class Interpreter private(
cliConfig: CliConfig,
dbConnection: Either[LoaderError, Connection],
amazonS3: AmazonS3,
tracker: Option[Tracker]) {

def run: LoaderA ~> Id = new (LoaderA ~> Id) {

def apply[A](effect: LoaderA[A]): Id[A] = {
effect match {
case ListS3(folder) =>
S3Interpreter.list(amazonS3, folder).map(summaries => summaries.map(S3.getKey))
case KeyExists(key) =>
S3Interpreter.keyExists(amazonS3, key)
case DownloadData(source, dest) =>
S3Interpreter.downloadData(amazonS3, source, dest)

case ExecuteQuery(query) =>
for {
conn <- dbConnection
res <- PgInterpreter.executeQuery(conn)(query)
} yield res
case ExecuteTransaction(queries) =>
for {
conn <- dbConnection
res <- PgInterpreter.executeTransaction(conn, queries)
} yield res
case ExecuteQueries(queries) =>
for {
conn <- dbConnection
res <- PgInterpreter.executeQueries(conn, queries)
} yield res
case CopyViaStdin(files, query) =>
for {
conn <- dbConnection
res <- PgInterpreter.copyViaStdin(conn, files, query)
} yield res

case CreateTmpDir =>
try {
Files.createTempDirectory("rdb-loader").asRight
} catch {
case NonFatal(e) => LoaderLocalError("Cannot create temporary directory.\n" + e.toString).asLeft
}
case DeleteDir(path) =>
try {
Files.walkFileTree(path, Interpreter.DeleteVisitor).asRight[LoaderError].void
} catch {
case NonFatal(e) => LoaderLocalError(s"Cannot delete directory [${path.toString}].\n" + e.toString).asLeft
}


case Sleep(timeout) =>
Thread.sleep(timeout)
case Track(result) =>
result match {
case ExitLog.LoadingSucceeded(_) =>
TrackerInterpreter.trackSuccess(tracker)
case ExitLog.LoadingFailed(message, _) =>
val sanitizedMessage = Common.sanitize(message, List(cliConfig.target.password, cliConfig.target.username))
TrackerInterpreter.trackError(tracker, sanitizedMessage)
}
case Dump(result) =>
TrackerInterpreter.dumpStdout(amazonS3, cliConfig.logKey, result.toString)
case Exit(loadResult, dumpResult) =>
dbConnection.foreach(c => c.close())
TrackerInterpreter.exit(loadResult, dumpResult)
}
}
}
trait Interpreter {
def run: LoaderA ~> Id
}

object Interpreter {

object DeleteVisitor extends SimpleFileVisitor[Path] {
override def visitFile(file: Path, attrs: BasicFileAttributes) = {
Files.delete(file)
FileVisitResult.CONTINUE
}

override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = {
Files.delete(dir)
FileVisitResult.CONTINUE
}
}

/**
* Initialize clients/connections for interpreter and interpreter itself
*
* @param cliConfig RDB Loader app configuration
* @return prepared interpreter
*/
* Initialize clients/connections for interpreter and interpreter itself
*
* @param cliConfig RDB Loader app configuration
* @return prepared interpreter
*/
def initialize(cliConfig: CliConfig): Interpreter = {

// dbConnection is Either because not required for log dump
val dbConnection = PgInterpreter.getConnection(cliConfig.target)
val amazonS3 = S3Interpreter.getClient(cliConfig.configYaml.aws)
val tracker = TrackerInterpreter.initializeTracking(cliConfig.configYaml.monitoring)

new Interpreter(cliConfig, dbConnection, amazonS3, tracker)
if (cliConfig.dryRun) {
new DryRunInterpreter(cliConfig, amazonS3, tracker)
} else {
// dbConnection is Either because not required for log dump
val dbConnection = PgInterpreter.getConnection(cliConfig.target)
new RealWorldInterpreter(cliConfig, dbConnection, amazonS3, tracker)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ object PgInterpreter {

def executeTransaction(conn: Connection, queries: List[SqlString]): Either[StorageTargetError, Unit] =
if (queries.nonEmpty) {
val begin = SqlString.unsafeCoerce("BEGIN;")
val commit = SqlString.unsafeCoerce("COMMIT;")
val transaction = (begin :: queries) :+ commit
val transaction = makeTransaction(queries)
executeQueries(conn, transaction).void
} else Right(())

Expand Down Expand Up @@ -96,6 +94,13 @@ object PgInterpreter {
case NonFatal(e) => Left(StorageTargetError(e.toString))
}

/** Wrap queries into transaction */
def makeTransaction(queries: List[SqlString]): List[SqlString] = {
val begin = SqlString.unsafeCoerce("BEGIN;")
val commit = SqlString.unsafeCoerce("COMMIT;")
(begin :: queries) :+ commit
}

/**
* Get Redshift or Postgres connection
*/
Expand Down
Loading

0 comments on commit 21fbf33

Please sign in to comment.