Skip to content

Commit

Permalink
Snowflake Loader: auto-configure staging location paths (close #1059)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Nov 7, 2022
1 parent 6bf2989 commit 479596d
Show file tree
Hide file tree
Showing 23 changed files with 262 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package com.snowplowanalytics.snowplow.loader.databricks

import cats.Monad
import cats.data.NonEmptyList

import doobie.Fragment
Expand All @@ -28,6 +29,7 @@ import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget}
import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip, EventTableColumns}
import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Entity}
import com.snowplowanalytics.snowplow.rdbloader.db.{Manifest, Statement, Target}
import com.snowplowanalytics.snowplow.rdbloader.dsl.DAO
import com.snowplowanalytics.snowplow.rdbloader.cloud.LoadAuthService.LoadAuthMethod
import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType}
import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable
Expand All @@ -38,10 +40,10 @@ object Databricks {
val UnstructPrefix = "unstruct_event_"
val ContextsPrefix = "contexts_"

def build(config: Config[StorageTarget]): Either[String, Target] = {
def build(config: Config[StorageTarget]): Either[String, Target[Unit]] = {
config.storage match {
case tgt: StorageTarget.Databricks =>
val result = new Target {
val result = new Target[Unit] {

override val requiresEventsColumns: Boolean = true

Expand All @@ -53,16 +55,19 @@ object Databricks {
override def getLoadStatements(
discovery: DataDiscovery,
eventTableColumns: EventTableColumns,
loadAuthMethod: LoadAuthMethod
loadAuthMethod: LoadAuthMethod,
i: Unit
): LoadStatements = {
val toCopy = ColumnsToCopy.fromDiscoveredData(discovery)
val toSkip = ColumnsToSkip(getEntityColumnsPresentInDbOnly(eventTableColumns, toCopy))

NonEmptyList.one(
Statement.EventsCopy(discovery.base, discovery.compression, toCopy, toSkip, discovery.typesInfo, loadAuthMethod)
Statement.EventsCopy(discovery.base, discovery.compression, toCopy, toSkip, discovery.typesInfo, loadAuthMethod, i)
)
}

override def initQuery[F[_]: DAO: Monad]: F[Unit] = Monad[F].unit

override def createTable(schemas: SchemaList): Block = Block(Nil, Nil, Entity.Table(tgt.schema, schemas.latest.schemaKey))

override def getManifest: Statement =
Expand Down Expand Up @@ -99,15 +104,15 @@ object Databricks {
val frTableName = Fragment.const(qualify(AlertingTempTableName))
val frManifest = Fragment.const(qualify(Manifest.Name))
sql"SELECT run_id FROM $frTableName MINUS SELECT base FROM $frManifest"
case Statement.FoldersCopy(source, loadAuthMethod) =>
case Statement.FoldersCopy(source, loadAuthMethod, _) =>
val frTableName = Fragment.const(qualify(AlertingTempTableName))
val frPath = Fragment.const0(source)
val frAuth = loadAuthMethodFragment(loadAuthMethod)

sql"""COPY INTO $frTableName
FROM (SELECT _C0::VARCHAR(512) RUN_ID FROM '$frPath' $frAuth)
FILEFORMAT = CSV"""
case Statement.EventsCopy(path, _, toCopy, toSkip, _, loadAuthMethod) =>
case Statement.EventsCopy(path, _, toCopy, toSkip, _, loadAuthMethod, _) =>
val frTableName = Fragment.const(qualify(EventsTable.MainName))
val frPath = Fragment.const0(path.append("output=good"))
val nonNulls = toCopy.names.map(_.value)
Expand Down Expand Up @@ -177,6 +182,8 @@ object Databricks {
throw new IllegalStateException("Databricks Loader does not use EventsCopyToTempTable statement")
case _: Statement.EventsCopyFromTempTable =>
throw new IllegalStateException("Databricks Loader does not use EventsCopyFromTempTable statement")
case Statement.StagePath(_) =>
throw new IllegalStateException("Databricks Loader does not use StagePath statement")
case Statement.VacuumEvents => sql"""
OPTIMIZE ${Fragment.const0(qualify(EventsTable.MainName))}
WHERE collector_tstamp_date >= current_timestamp() - INTERVAL ${tgt.eventsOptimizePeriod.toSeconds} second"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.snowplowanalytics.snowplow.rdbloader.Runner
object Main extends IOApp {

def run(args: List[String]): IO[ExitCode] =
Runner.run[IO](
Runner.run[IO, Unit](
args,
Databricks.build,
"rdb-loader-databricks"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class DatabricksSpec extends Specification {

val discovery = DataDiscovery(baseFolder, shreddedTypes, Compression.Gzip, TypesInfo.WideRow(PARQUET, List.empty))

target.getLoadStatements(discovery, eventsColumns, LoadAuthMethod.NoCreds) should be like {
case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip, _, _), Nil) =>
target.getLoadStatements(discovery, eventsColumns, LoadAuthMethod.NoCreds, ()) should be like {
case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip, _, _, _), Nil) =>
path must beEqualTo(baseFolder)
compression must beEqualTo(Compression.Gzip)

Expand Down Expand Up @@ -96,7 +96,15 @@ class DatabricksSpec extends Specification {
)
)
val statement =
Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, TypesInfo.WideRow(PARQUET, List.empty), LoadAuthMethod.NoCreds)
Statement.EventsCopy(
baseFolder,
Compression.Gzip,
toCopy,
toSkip,
TypesInfo.WideRow(PARQUET, List.empty),
LoadAuthMethod.NoCreds,
()
)

target.toFragment(statement).toString must beLike { case sql =>
sql must contain(
Expand All @@ -121,7 +129,7 @@ class DatabricksSpec extends Specification {
)
val loadAuthMethod = LoadAuthMethod.TempCreds("testAccessKey", "testSecretKey", "testSessionToken")
val statement =
Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, TypesInfo.WideRow(PARQUET, List.empty), loadAuthMethod)
Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, TypesInfo.WideRow(PARQUET, List.empty), loadAuthMethod, ())

target.toFragment(statement).toString must beLike { case sql =>
sql must contain(
Expand All @@ -137,7 +145,7 @@ object DatabricksSpec {
val baseFolder: BlobStorage.Folder =
BlobStorage.Folder.coerce("s3://somewhere/path")

val target: Target = Databricks
val target: Target[Unit] = Databricks
.build(
Config(
StorageTarget.Databricks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}
import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget}
import com.snowplowanalytics.snowplow.rdbloader.db.Columns._
import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, Control => DbControl, HealthCheck, Manifest, Statement}
import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, Control => DbControl, HealthCheck, Manifest, Statement, Target}
import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, NoOperation, Retries}
import com.snowplowanalytics.snowplow.rdbloader.dsl.{
Cache,
Expand Down Expand Up @@ -73,14 +73,18 @@ object Loader {
*[_],
C
]: Concurrent: BlobStorage: Queue.Consumer: Clock: Iglu: Cache: Logging: Timer: Monitoring: ContextShift: LoadAuthService: JsonPathDiscovery,
C[_]: DAO: MonadThrow: Logging
C[_]: DAO: MonadThrow: Logging,
I
](
config: Config[StorageTarget],
control: Control[F],
telemetry: Telemetry[F]
telemetry: Telemetry[F],
target: Target[I]
): F[Unit] = {
val folderMonitoring: Stream[F, Unit] =
FolderMonitoring.run[F, C](config.monitoring.folders, config.readyCheck, config.storage, control.isBusy)
val folderMonitoring: Stream[F, Unit] = for {
initQueryResult <- initQuery[F, C, I](target)
_ <- FolderMonitoring.run[F, C, I](config.monitoring.folders, config.readyCheck, config.storage, control.isBusy, initQueryResult)
} yield ()
val noOpScheduling: Stream[F, Unit] =
NoOperation.run(config.schedules.noOperation, control.makePaused, control.signal.map(_.loading))

Expand All @@ -89,7 +93,10 @@ object Loader {
val healthCheck =
HealthCheck.start[F, C](config.monitoring.healthCheck)
val loading: Stream[F, Unit] =
loadStream[F, C](config, control)
for {
initQueryResult <- initQuery[F, C, I](target)
_ <- loadStream[F, C, I](config, control, initQueryResult, target)
} yield ()
val stateLogging: Stream[F, Unit] =
Stream
.awakeDelay[F](StateLoggingFrequency)
Expand All @@ -104,7 +111,7 @@ object Loader {
Logging[F].info("Target check is completed")
val noOperationPrepare = NoOperation.prepare(config.schedules.noOperation, control.makePaused) *>
Logging[F].info("No operation prepare step is completed")
val manifestInit = initRetry(Manifest.initialize[F, C](config.storage)) *>
val manifestInit = initRetry(Manifest.initialize[F, C, I](config.storage, target)) *>
Logging[F].info("Manifest initialization is completed")
val addLoadTstamp = addLoadTstampColumn[F, C](config.featureFlags.addLoadTstampColumn, config.storage) *>
Logging[F].info("Adding load_tstamp column is completed")
Expand Down Expand Up @@ -135,10 +142,13 @@ object Loader {
*[_],
C
]: Concurrent: BlobStorage: Queue.Consumer: Iglu: Cache: Logging: Timer: Monitoring: ContextShift: LoadAuthService: JsonPathDiscovery,
C[_]: DAO: MonadThrow: Logging
C[_]: DAO: MonadThrow: Logging,
I
](
config: Config[StorageTarget],
control: Control[F]
control: Control[F],
initQueryResult: I,
target: Target[I]
): Stream[F, Unit] = {
val sqsDiscovery: DiscoveryStream[F] =
DataDiscovery.discover[F](config, control.incrementMessages)
Expand All @@ -148,7 +158,7 @@ object Loader {

discovery
.pauseWhen[F](control.isBusy)
.evalMap(processDiscovery[F, C](config, control))
.evalMap(processDiscovery[F, C, I](config, control, initQueryResult, target))
}

/**
Expand All @@ -158,10 +168,13 @@ object Loader {
*/
private def processDiscovery[
F[_]: Transaction[*[_], C]: Concurrent: Iglu: Logging: Timer: Monitoring: ContextShift: LoadAuthService,
C[_]: DAO: MonadThrow: Logging
C[_]: DAO: MonadThrow: Logging,
I
](
config: Config[StorageTarget],
control: Control[F]
control: Control[F],
initQueryResult: I,
target: Target[I]
)(
discovery: DataDiscovery.WithOrigin
): F[Unit] = {
Expand All @@ -180,7 +193,7 @@ object Loader {
start <- Clock[F].instantNow
_ <- discovery.origin.timestamps.min.map(t => Monitoring[F].periodicMetrics.setEarliestKnownUnloadedData(t)).sequence.void
loadAuth <- LoadAuthService[F].getLoadAuthMethod(config.storage.eventsLoadAuthMethod)
result <- Load.load[F, C](config, setStageC, control.incrementAttempts, discovery, loadAuth)
result <- Load.load[F, C, I](config, setStageC, control.incrementAttempts, discovery, loadAuth, initQueryResult, target)
attempts <- control.getAndResetAttempts
_ <- result match {
case Right(ingested) =>
Expand Down Expand Up @@ -232,6 +245,9 @@ object Loader {
.map(_.value.toLowerCase)
.contains(AtomicColumns.ColumnsWithDefault.LoadTstamp.value)

private def initQuery[F[_]: Transaction[*[_], C], C[_]: DAO: MonadThrow, I](target: Target[I]) =
Stream.eval(Transaction[F, C].run(target.initQuery))

/**
* Handle a failure during loading. `Load.getTransaction` can fail only in one "expected" way - if
* the folder is already loaded everything else in the transaction and outside (migration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,28 @@ import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig
/** Generic starting point for all loaders */
object Runner {

def run[F[_]: Clock: ConcurrentEffect: ContextShift: Timer: Parallel](
def run[F[_]: Clock: ConcurrentEffect: ContextShift: Timer: Parallel, I](
argv: List[String],
buildStatements: BuildTarget,
buildStatements: BuildTarget[I],
appName: String
): F[ExitCode] = {
val result = for {
parsed <- CliConfig.parse[F](argv)
statements <- EitherT.fromEither[F](buildStatements(parsed.config))
application =
Environment
.initialize[F](
.initialize[F, I](
parsed,
statements,
appName,
generated.BuildInfo.version
)
.use { env: Environment[F] =>
.use { env: Environment[F, I] =>
import env._

Logging[F]
.info(s"RDB Loader ${generated.BuildInfo.version} has started.") *>
Loader.run[F, ConnectionIO](parsed.config, env.controlF, env.telemetryF).as(ExitCode.Success)
Loader.run[F, ConnectionIO, I](parsed.config, env.controlF, env.telemetryF, env.dbTarget).as(ExitCode.Success)
}
exitCode <- EitherT.liftF[F, String, ExitCode](application)
} yield exitCode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ object Manifest {
"shredded_cardinality"
).map(ColumnName)

def initialize[F[_]: MonadThrow: Logging: Timer, C[_]: DAO: Monad](
target: StorageTarget
def initialize[F[_]: MonadThrow: Logging: Timer, C[_]: DAO: Monad, I](
config: StorageTarget,
target: Target[I]
)(implicit F: Transaction[F, C]
): F[Unit] =
F.transact(setup[C](target.schema, target)).attempt.flatMap {
F.transact(setup[C, I](config.schema, config, target)).attempt.flatMap {
case Right(InitStatus.Created) =>
Logging[F].info("The manifest table has been created")
case Right(InitStatus.Migrated) =>
Expand All @@ -47,8 +48,12 @@ object Manifest {
MonadError[F, Throwable].raiseError(new IllegalStateException(error.toString))
}

def setup[F[_]: Monad: DAO](schema: String, target: StorageTarget): F[InitStatus] = target match {
case _: Databricks => create[F].as(InitStatus.Created)
def setup[F[_]: Monad: DAO, I](
schema: String,
config: StorageTarget,
target: Target[I]
): F[InitStatus] = config match {
case _: Databricks => create[F, I](target).as(InitStatus.Created)
case _ =>
for {
exists <- Control.tableExists[F](Name)
Expand All @@ -57,14 +62,14 @@ object Manifest {
legacy = existingTableColumns.toSet === LegacyColumns.toSet
status <- if (legacy)
Control.renameTable[F](Name, LegacyName) *>
create[F].as[InitStatus](InitStatus.Migrated)
create[F, I](target).as[InitStatus](InitStatus.Migrated)
else
Monad[F].pure[InitStatus](InitStatus.NoChanges)
} yield status
else create[F].as(InitStatus.Created)
else create[F, I](target).as(InitStatus.Created)
_ <- status match {
case InitStatus.Migrated | InitStatus.Created =>
target match {
config match {
case _: Redshift => DAO[F].executeUpdate(Statement.CommentOn(s"$schema.$Name", "0.2.0"), DAO.Purpose.NonLoading)
case _ => Monad[F].unit
}
Expand All @@ -81,8 +86,8 @@ object Manifest {
DAO[F].executeQueryOption[Entry](Statement.ManifestGet(base))(Entry.entryRead)

/** Create manifest table */
def create[F[_]: DAO: Functor]: F[Unit] =
DAO[F].executeUpdate(DAO[F].target.getManifest, DAO.Purpose.NonLoading).void
def create[F[_]: DAO: Functor, I](target: Target[I]): F[Unit] =
DAO[F].executeUpdate(target.getManifest, DAO.Purpose.NonLoading).void

case class Entry(ingestion: Instant, meta: LoaderMessage.ManifestItem)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ object Migration {
}

/** Inspect DB state and create a [[Migration]] object that contains all necessary actions */
def build[F[_]: Transaction[*[_], C]: MonadThrow: Iglu, C[_]: MonadThrow: Logging: DAO](discovery: DataDiscovery): F[Migration[C]] = {
def build[F[_]: Transaction[*[_], C]: MonadThrow: Iglu, C[_]: MonadThrow: Logging: DAO, I](
discovery: DataDiscovery,
target: Target[I]
): F[Migration[C]] = {
val descriptions: LoaderAction[F, List[Description]] =
discovery.shreddedTypes.filterNot(_.isAtomic).traverse {
case s: ShreddedType.Tabular =>
Expand All @@ -165,7 +168,7 @@ object Migration {
// Duplicate schemas cause migration vector to double failing the second migration. Therefore deduplication
// with toSet.toList
schemaList.toSet.toList
.traverseFilter(buildBlock[C])
.traverseFilter(buildBlock[C, I](_, target))
.flatMap(blocks => Migration.fromBlocks[C](blocks))
case Left(error) =>
MonadThrow[C].raiseError[Migration[C]](error)
Expand All @@ -178,8 +181,7 @@ object Migration {
def empty[F[_]: Applicative]: Migration[F] =
Migration[F](Nil, Applicative[F].unit)

def buildBlock[F[_]: MonadThrow: DAO](description: Description): F[Option[Block]] = {
val target = DAO[F].target
def buildBlock[F[_]: MonadThrow: DAO, I](description: Description, target: Target[I]): F[Option[Block]] =
description match {
case Description.Table(schemas) =>
val tableName = StringUtils.getTableName(schemas.latest)
Expand All @@ -203,10 +205,9 @@ object Migration {
case Description.NoMigration =>
Monad[F].pure(none[Block])
}
}

def migrateTable(
target: Target,
def migrateTable[I](
target: Target[I],
current: SchemaKey,
columns: List[ColumnName],
schemaList: SchemaList
Expand Down
Loading

0 comments on commit 479596d

Please sign in to comment.