Skip to content

Commit

Permalink
Snowflake Loader: auto-configure staging location paths (close #1059)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Nov 18, 2022
1 parent c1d786f commit f47d44b
Show file tree
Hide file tree
Showing 29 changed files with 341 additions and 180 deletions.
5 changes: 1 addition & 4 deletions config/loader/aws/snowflake.config.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
"schema": "atomic",
"database": "snowplow",

"transformedStage": {
"name": "snowplow_stage"
"location": "s3://bucket/transformed/"
}
"transformedStage": "snowplow_stage"
}
}
14 changes: 2 additions & 12 deletions config/loader/aws/snowflake.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,10 @@
# DB name
"database": "snowplow",
# A stage where the data is stored. Must be provided if 'NoCreds' is chosen as load auth method.
"transformedStage": {
# The name of the stage
"name": "snowplow_stage"
# The S3 path used as stage location
"location": "s3://bucket/transformed/"
}
"transformedStage": "snowplow_stage"
# A stage where the data for monitoring.folders is stored.
# Must be provided iff monitoring.folders is configured and 'NoCreds' is chosen as load auth method.
"folderMonitoringStage": {
# The name of the stage
"name": "snowplow_folders_stage"
# The S3 path used as stage location
"location": "s3://bucket/monitoring/"
}
"folderMonitoringStage": "snowplow_folders_stage"
# An optional host name that will take a priority over automatically derived
"jdbcHost": "acme.eu-central-1.snowflake.com"
# Optional, default method is 'NoCreds'
Expand Down
5 changes: 1 addition & 4 deletions config/loader/gcp/snowflake.config.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
"schema": "atomic",
"database": "snowplow",

"transformedStage": {
"name": "snowplow_stage"
"location": "gs://bucket/transformed/"
}
"transformedStage": "snowplow_stage"
}
}
14 changes: 2 additions & 12 deletions config/loader/gcp/snowflake.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,10 @@
# DB name
"database": "snowplow",
# A stage where the data is stored.
"transformedStage": {
# The name of the stage
"name": "snowplow_stage"
# The GCS path used as stage location
"location": "gs://bucket/transformed/"
}
"transformedStage": "snowplow_stage"
# A stage where the data for monitoring.folders is stored.
# Must be provided iff monitoring.folders is configured.
"folderMonitoringStage": {
# The name of the stage
"name": "snowplow_folders_stage"
# The GCS path used as stage location
"location": "gs://bucket/monitoring/"
}
"folderMonitoringStage": "snowplow_folders_stage"
# An optional host name that will take a priority over automatically derived
"jdbcHost": "acme.eu-central-1.snowflake.com"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ object BlobStorage {
private def fixPrefix(s: String): String =
if (s.startsWith("s3n")) "s3" + s.stripPrefix("s3n")
else if (s.startsWith("s3a")) "s3" + s.stripPrefix("s3a")
else if (s.startsWith("gcs")) "gs" + s.stripPrefix("gcs")
else s

def noop[F[_]: MonadThrow]: BlobStorage[F] = new BlobStorage[F] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package com.snowplowanalytics.snowplow.loader.databricks

import cats.Monad
import cats.data.NonEmptyList

import doobie.Fragment
Expand All @@ -28,6 +29,7 @@ import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget}
import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip, EventTableColumns}
import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Entity}
import com.snowplowanalytics.snowplow.rdbloader.db.{Manifest, Statement, Target}
import com.snowplowanalytics.snowplow.rdbloader.dsl.DAO
import com.snowplowanalytics.snowplow.rdbloader.cloud.LoadAuthService.LoadAuthMethod
import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType}
import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable
Expand All @@ -38,10 +40,10 @@ object Databricks {
val UnstructPrefix = "unstruct_event_"
val ContextsPrefix = "contexts_"

def build(config: Config[StorageTarget]): Either[String, Target] = {
def build(config: Config[StorageTarget]): Either[String, Target[Unit]] = {
config.storage match {
case tgt: StorageTarget.Databricks =>
val result = new Target {
val result = new Target[Unit] {

override val requiresEventsColumns: Boolean = true

Expand All @@ -53,16 +55,19 @@ object Databricks {
override def getLoadStatements(
discovery: DataDiscovery,
eventTableColumns: EventTableColumns,
loadAuthMethod: LoadAuthMethod
loadAuthMethod: LoadAuthMethod,
i: Unit
): LoadStatements = {
val toCopy = ColumnsToCopy.fromDiscoveredData(discovery)
val toSkip = ColumnsToSkip(getEntityColumnsPresentInDbOnly(eventTableColumns, toCopy))

NonEmptyList.one(
Statement.EventsCopy(discovery.base, discovery.compression, toCopy, toSkip, discovery.typesInfo, loadAuthMethod)
Statement.EventsCopy(discovery.base, discovery.compression, toCopy, toSkip, discovery.typesInfo, loadAuthMethod, i)
)
}

override def initQuery[F[_]: DAO: Monad]: F[Unit] = Monad[F].unit

override def createTable(schemas: SchemaList): Block = Block(Nil, Nil, Entity.Table(tgt.schema, schemas.latest.schemaKey))

override def getManifest: Statement =
Expand Down Expand Up @@ -99,15 +104,15 @@ object Databricks {
val frTableName = Fragment.const(qualify(AlertingTempTableName))
val frManifest = Fragment.const(qualify(Manifest.Name))
sql"SELECT run_id FROM $frTableName MINUS SELECT base FROM $frManifest"
case Statement.FoldersCopy(source, loadAuthMethod) =>
case Statement.FoldersCopy(source, loadAuthMethod, _) =>
val frTableName = Fragment.const(qualify(AlertingTempTableName))
val frPath = Fragment.const0(source)
val frAuth = loadAuthMethodFragment(loadAuthMethod)

sql"""COPY INTO $frTableName
FROM (SELECT _C0::VARCHAR(512) RUN_ID FROM '$frPath' $frAuth)
FILEFORMAT = CSV"""
case Statement.EventsCopy(path, _, toCopy, toSkip, _, loadAuthMethod) =>
case Statement.EventsCopy(path, _, toCopy, toSkip, _, loadAuthMethod, _) =>
val frTableName = Fragment.const(qualify(EventsTable.MainName))
val frPath = Fragment.const0(path.append("output=good"))
val nonNulls = toCopy.names.map(_.value)
Expand Down Expand Up @@ -177,6 +182,8 @@ object Databricks {
throw new IllegalStateException("Databricks Loader does not use EventsCopyToTempTable statement")
case _: Statement.EventsCopyFromTempTable =>
throw new IllegalStateException("Databricks Loader does not use EventsCopyFromTempTable statement")
case Statement.StagePath(_) =>
throw new IllegalStateException("Databricks Loader does not use StagePath statement")
case Statement.VacuumEvents => sql"""
OPTIMIZE ${Fragment.const0(qualify(EventsTable.MainName))}
WHERE collector_tstamp_date >= current_timestamp() - INTERVAL ${tgt.eventsOptimizePeriod.toSeconds} second"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.snowplowanalytics.snowplow.rdbloader.Runner
object Main extends IOApp {

def run(args: List[String]): IO[ExitCode] =
Runner.run[IO](
Runner.run[IO, Unit](
args,
Databricks.build,
"rdb-loader-databricks"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class DatabricksSpec extends Specification {

val discovery = DataDiscovery(baseFolder, shreddedTypes, Compression.Gzip, TypesInfo.WideRow(PARQUET, List.empty))

target.getLoadStatements(discovery, eventsColumns, LoadAuthMethod.NoCreds) should be like {
case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip, _, _), Nil) =>
target.getLoadStatements(discovery, eventsColumns, LoadAuthMethod.NoCreds, ()) should be like {
case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip, _, _, _), Nil) =>
path must beEqualTo(baseFolder)
compression must beEqualTo(Compression.Gzip)

Expand Down Expand Up @@ -96,7 +96,15 @@ class DatabricksSpec extends Specification {
)
)
val statement =
Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, TypesInfo.WideRow(PARQUET, List.empty), LoadAuthMethod.NoCreds)
Statement.EventsCopy(
baseFolder,
Compression.Gzip,
toCopy,
toSkip,
TypesInfo.WideRow(PARQUET, List.empty),
LoadAuthMethod.NoCreds,
()
)

target.toFragment(statement).toString must beLike { case sql =>
sql must contain(
Expand All @@ -121,7 +129,7 @@ class DatabricksSpec extends Specification {
)
val loadAuthMethod = LoadAuthMethod.TempCreds("testAccessKey", "testSecretKey", "testSessionToken")
val statement =
Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, TypesInfo.WideRow(PARQUET, List.empty), loadAuthMethod)
Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, TypesInfo.WideRow(PARQUET, List.empty), loadAuthMethod, ())

target.toFragment(statement).toString must beLike { case sql =>
sql must contain(
Expand All @@ -137,7 +145,7 @@ object DatabricksSpec {
val baseFolder: BlobStorage.Folder =
BlobStorage.Folder.coerce("s3://somewhere/path")

val target: Target = Databricks
val target: Target[Unit] = Databricks
.build(
Config(
StorageTarget.Databricks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}
import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget}
import com.snowplowanalytics.snowplow.rdbloader.db.Columns._
import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, Control => DbControl, HealthCheck, Manifest, Statement}
import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, Control => DbControl, HealthCheck, Manifest, Statement, Target}
import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, NoOperation, Retries}
import com.snowplowanalytics.snowplow.rdbloader.dsl.{
Cache,
Expand Down Expand Up @@ -67,20 +67,27 @@ object Loader {
* auxiliary effect for communicating with database (usually `ConnectionIO`) Unlike `F` it
* cannot pull `A` out of DB (perform a transaction), but just claim `A` is needed and `C[A]`
* later can be materialized into `F[A]`
* @tparam I
* type of the query result which is sent to the warehouse during initialization of the
* application
*/
def run[
F[_]: Transaction[
*[_],
C
]: Concurrent: BlobStorage: Queue.Consumer: Clock: Iglu: Cache: Logging: Timer: Monitoring: ContextShift: LoadAuthService: JsonPathDiscovery,
C[_]: DAO: MonadThrow: Logging
C[_]: DAO: MonadThrow: Logging,
I
](
config: Config[StorageTarget],
control: Control[F],
telemetry: Telemetry[F]
telemetry: Telemetry[F],
target: Target[I]
): F[Unit] = {
val folderMonitoring: Stream[F, Unit] =
FolderMonitoring.run[F, C](config.monitoring.folders, config.readyCheck, config.storage, control.isBusy)
val folderMonitoring: Stream[F, Unit] = for {
initQueryResult <- initQuery[F, C, I](target)
_ <- FolderMonitoring.run[F, C, I](config.monitoring.folders, config.readyCheck, config.storage, control.isBusy, initQueryResult)
} yield ()
val noOpScheduling: Stream[F, Unit] =
NoOperation.run(config.schedules.noOperation, control.makePaused, control.signal.map(_.loading))

Expand All @@ -89,7 +96,10 @@ object Loader {
val healthCheck =
HealthCheck.start[F, C](config.monitoring.healthCheck)
val loading: Stream[F, Unit] =
loadStream[F, C](config, control)
for {
initQueryResult <- initQuery[F, C, I](target)
_ <- loadStream[F, C, I](config, control, initQueryResult, target)
} yield ()
val stateLogging: Stream[F, Unit] =
Stream
.awakeDelay[F](StateLoggingFrequency)
Expand All @@ -104,7 +114,7 @@ object Loader {
Logging[F].info("Target check is completed")
val noOperationPrepare = NoOperation.prepare(config.schedules.noOperation, control.makePaused) *>
Logging[F].info("No operation prepare step is completed")
val manifestInit = initRetry(Manifest.initialize[F, C](config.storage)) *>
val manifestInit = initRetry(Manifest.initialize[F, C, I](config.storage, target)) *>
Logging[F].info("Manifest initialization is completed")
val addLoadTstamp = addLoadTstampColumn[F, C](config.featureFlags.addLoadTstampColumn, config.storage) *>
Logging[F].info("Adding load_tstamp column is completed")
Expand Down Expand Up @@ -135,10 +145,13 @@ object Loader {
*[_],
C
]: Concurrent: BlobStorage: Queue.Consumer: Iglu: Cache: Logging: Timer: Monitoring: ContextShift: LoadAuthService: JsonPathDiscovery,
C[_]: DAO: MonadThrow: Logging
C[_]: DAO: MonadThrow: Logging,
I
](
config: Config[StorageTarget],
control: Control[F]
control: Control[F],
initQueryResult: I,
target: Target[I]
): Stream[F, Unit] = {
val sqsDiscovery: DiscoveryStream[F] =
DataDiscovery.discover[F](config, control.incrementMessages)
Expand All @@ -148,7 +161,7 @@ object Loader {

discovery
.pauseWhen[F](control.isBusy)
.evalMap(processDiscovery[F, C](config, control))
.evalMap(processDiscovery[F, C, I](config, control, initQueryResult, target))
}

/**
Expand All @@ -158,10 +171,13 @@ object Loader {
*/
private def processDiscovery[
F[_]: Transaction[*[_], C]: Concurrent: Iglu: Logging: Timer: Monitoring: ContextShift: LoadAuthService,
C[_]: DAO: MonadThrow: Logging
C[_]: DAO: MonadThrow: Logging,
I
](
config: Config[StorageTarget],
control: Control[F]
control: Control[F],
initQueryResult: I,
target: Target[I]
)(
discovery: DataDiscovery.WithOrigin
): F[Unit] = {
Expand All @@ -180,7 +196,7 @@ object Loader {
start <- Clock[F].instantNow
_ <- discovery.origin.timestamps.min.map(t => Monitoring[F].periodicMetrics.setEarliestKnownUnloadedData(t)).sequence.void
loadAuth <- LoadAuthService[F].getLoadAuthMethod(config.storage.eventsLoadAuthMethod)
result <- Load.load[F, C](config, setStageC, control.incrementAttempts, discovery, loadAuth)
result <- Load.load[F, C, I](config, setStageC, control.incrementAttempts, discovery, loadAuth, initQueryResult, target)
attempts <- control.getAndResetAttempts
_ <- result match {
case Right(ingested) =>
Expand Down Expand Up @@ -232,6 +248,10 @@ object Loader {
.map(_.value.toLowerCase)
.contains(AtomicColumns.ColumnsWithDefault.LoadTstamp.value)

/** Query to get necessary bits from the warehouse during initialization of the application */
private def initQuery[F[_]: Transaction[*[_], C], C[_]: DAO: MonadThrow, I](target: Target[I]): Stream[F, I] =
Stream.eval(Transaction[F, C].run(target.initQuery))

/**
* Handle a failure during loading. `Load.getTransaction` can fail only in one "expected" way - if
* the folder is already loaded everything else in the transaction and outside (migration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,39 @@ import com.snowplowanalytics.snowplow.rdbloader.dsl._
import com.snowplowanalytics.snowplow.rdbloader.dsl.Environment
import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig

/** Generic starting point for all loaders */
object Runner {

def run[F[_]: Clock: ConcurrentEffect: ContextShift: Timer: Parallel](
/**
* Generic starting point for all loaders
*
* @tparam F
* primary application's effect (usually `IO`)
* @tparam I
* type of the query result which is sent to the warehouse during initialization of the
* application
*/
def run[F[_]: Clock: ConcurrentEffect: ContextShift: Timer: Parallel, I](
argv: List[String],
buildStatements: BuildTarget,
buildStatements: BuildTarget[I],
appName: String
): F[ExitCode] = {
val result = for {
parsed <- CliConfig.parse[F](argv)
statements <- EitherT.fromEither[F](buildStatements(parsed.config))
application =
Environment
.initialize[F](
.initialize[F, I](
parsed,
statements,
appName,
generated.BuildInfo.version
)
.use { env: Environment[F] =>
.use { env: Environment[F, I] =>
import env._

Logging[F]
.info(s"RDB Loader ${generated.BuildInfo.version} has started.") *>
Loader.run[F, ConnectionIO](parsed.config, env.controlF, env.telemetryF).as(ExitCode.Success)
Loader.run[F, ConnectionIO, I](parsed.config, env.controlF, env.telemetryF, env.dbTarget).as(ExitCode.Success)
}
exitCode <- EitherT.liftF[F, String, ExitCode](application)
} yield exitCode
Expand Down
Loading

0 comments on commit f47d44b

Please sign in to comment.