From 58f5d52dc0f4bf4c1cda12db1de9acda3648976e Mon Sep 17 00:00:00 2001 From: spenes Date: Mon, 7 Nov 2022 00:19:10 +0300 Subject: [PATCH] Snowflake Loader: auto-configure staging location paths (close #1059) --- .../loader/databricks/Databricks.scala | 19 +++-- .../snowplow/loader/databricks/Main.scala | 2 +- .../loader/databricks/DatabricksSpec.scala | 18 ++-- .../snowplow/rdbloader/Loader.scala | 43 +++++++--- .../snowplow/rdbloader/Runner.scala | 10 +-- .../snowplow/rdbloader/db/Manifest.scala | 25 +++--- .../snowplow/rdbloader/db/Migration.scala | 15 ++-- .../snowplow/rdbloader/db/Statement.scala | 13 ++- .../snowplow/rdbloader/db/Target.scala | 11 ++- .../snowplow/rdbloader/dsl/DAO.scala | 8 +- .../snowplow/rdbloader/dsl/Environment.scala | 13 +-- .../rdbloader/dsl/FolderMonitoring.scala | 36 ++++++-- .../snowplow/rdbloader/loading/Load.scala | 53 +++++++++--- .../snowplow/rdbloader/package.scala | 2 +- .../rdbloader/dsl/FolderMonitoringSpec.scala | 8 +- .../snowplow/rdbloader/loading/LoadSpec.scala | 33 +++++--- .../snowplow/rdbloader/test/PureDAO.scala | 18 ++-- .../snowplow/loader/redshift/Main.scala | 2 +- .../snowplow/loader/redshift/Redshift.scala | 20 +++-- .../loader/redshift/RedshiftSpec.scala | 2 +- .../loader/redshift/db/MigrationSpec.scala | 4 +- .../snowplow/loader/snowflake/Main.scala | 2 +- .../snowplow/loader/snowflake/Snowflake.scala | 82 ++++++++++++++++--- 23 files changed, 308 insertions(+), 131 deletions(-) diff --git a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala index 9e52b99e0..ab1751ce2 100644 --- a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala +++ b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala @@ -12,6 +12,7 @@ */ package com.snowplowanalytics.snowplow.loader.databricks +import cats.Monad import cats.data.NonEmptyList import doobie.Fragment @@ -28,6 +29,7 @@ import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip, EventTableColumns} import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Entity} import com.snowplowanalytics.snowplow.rdbloader.db.{Manifest, Statement, Target} +import com.snowplowanalytics.snowplow.rdbloader.dsl.DAO import com.snowplowanalytics.snowplow.rdbloader.cloud.LoadAuthService.LoadAuthMethod import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable @@ -38,10 +40,10 @@ object Databricks { val UnstructPrefix = "unstruct_event_" val ContextsPrefix = "contexts_" - def build(config: Config[StorageTarget]): Either[String, Target] = { + def build(config: Config[StorageTarget]): Either[String, Target[Unit]] = { config.storage match { case tgt: StorageTarget.Databricks => - val result = new Target { + val result = new Target[Unit] { override val requiresEventsColumns: Boolean = true @@ -53,16 +55,19 @@ object Databricks { override def getLoadStatements( discovery: DataDiscovery, eventTableColumns: EventTableColumns, - loadAuthMethod: LoadAuthMethod + loadAuthMethod: LoadAuthMethod, + i: Unit ): LoadStatements = { val toCopy = ColumnsToCopy.fromDiscoveredData(discovery) val toSkip = ColumnsToSkip(getEntityColumnsPresentInDbOnly(eventTableColumns, toCopy)) NonEmptyList.one( - Statement.EventsCopy(discovery.base, discovery.compression, toCopy, toSkip, discovery.typesInfo, loadAuthMethod) + Statement.EventsCopy(discovery.base, discovery.compression, toCopy, toSkip, discovery.typesInfo, loadAuthMethod, i) ) } + override def initQuery[F[_]: DAO: Monad]: F[Unit] = Monad[F].unit + override def createTable(schemas: SchemaList): Block = Block(Nil, Nil, Entity.Table(tgt.schema, schemas.latest.schemaKey)) override def getManifest: Statement = @@ -99,7 +104,7 @@ object Databricks { val frTableName = Fragment.const(qualify(AlertingTempTableName)) val frManifest = Fragment.const(qualify(Manifest.Name)) sql"SELECT run_id FROM $frTableName MINUS SELECT base FROM $frManifest" - case Statement.FoldersCopy(source, loadAuthMethod) => + case Statement.FoldersCopy(source, loadAuthMethod, _) => val frTableName = Fragment.const(qualify(AlertingTempTableName)) val frPath = Fragment.const0(source) val frAuth = loadAuthMethodFragment(loadAuthMethod) @@ -107,7 +112,7 @@ object Databricks { sql"""COPY INTO $frTableName FROM (SELECT _C0::VARCHAR(512) RUN_ID FROM '$frPath' $frAuth) FILEFORMAT = CSV""" - case Statement.EventsCopy(path, _, toCopy, toSkip, _, loadAuthMethod) => + case Statement.EventsCopy(path, _, toCopy, toSkip, _, loadAuthMethod, _) => val frTableName = Fragment.const(qualify(EventsTable.MainName)) val frPath = Fragment.const0(path.append("output=good")) val nonNulls = toCopy.names.map(_.value) @@ -177,6 +182,8 @@ object Databricks { throw new IllegalStateException("Databricks Loader does not use EventsCopyToTempTable statement") case _: Statement.EventsCopyFromTempTable => throw new IllegalStateException("Databricks Loader does not use EventsCopyFromTempTable statement") + case Statement.StagePath(_) => + throw new IllegalStateException("Databricks Loader does not use StagePath statement") case Statement.VacuumEvents => sql""" OPTIMIZE ${Fragment.const0(qualify(EventsTable.MainName))} WHERE collector_tstamp_date >= current_timestamp() - INTERVAL ${tgt.eventsOptimizePeriod.toSeconds} second""" diff --git a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Main.scala b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Main.scala index f5ab4c05d..57bff9aef 100644 --- a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Main.scala +++ b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Main.scala @@ -19,7 +19,7 @@ import com.snowplowanalytics.snowplow.rdbloader.Runner object Main extends IOApp { def run(args: List[String]): IO[ExitCode] = - Runner.run[IO]( + Runner.run[IO, Unit]( args, Databricks.build, "rdb-loader-databricks" diff --git a/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala index 27b9e01a6..e21baa3ea 100644 --- a/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala +++ b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala @@ -52,8 +52,8 @@ class DatabricksSpec extends Specification { val discovery = DataDiscovery(baseFolder, shreddedTypes, Compression.Gzip, TypesInfo.WideRow(PARQUET, List.empty)) - target.getLoadStatements(discovery, eventsColumns, LoadAuthMethod.NoCreds) should be like { - case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip, _, _), Nil) => + target.getLoadStatements(discovery, eventsColumns, LoadAuthMethod.NoCreds, ()) should be like { + case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip, _, _, _), Nil) => path must beEqualTo(baseFolder) compression must beEqualTo(Compression.Gzip) @@ -96,7 +96,15 @@ class DatabricksSpec extends Specification { ) ) val statement = - Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, TypesInfo.WideRow(PARQUET, List.empty), LoadAuthMethod.NoCreds) + Statement.EventsCopy( + baseFolder, + Compression.Gzip, + toCopy, + toSkip, + TypesInfo.WideRow(PARQUET, List.empty), + LoadAuthMethod.NoCreds, + () + ) target.toFragment(statement).toString must beLike { case sql => sql must contain( @@ -121,7 +129,7 @@ class DatabricksSpec extends Specification { ) val loadAuthMethod = LoadAuthMethod.TempCreds("testAccessKey", "testSecretKey", "testSessionToken") val statement = - Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, TypesInfo.WideRow(PARQUET, List.empty), loadAuthMethod) + Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, TypesInfo.WideRow(PARQUET, List.empty), loadAuthMethod, ()) target.toFragment(statement).toString must beLike { case sql => sql must contain( @@ -137,7 +145,7 @@ object DatabricksSpec { val baseFolder: BlobStorage.Folder = BlobStorage.Folder.coerce("s3://somewhere/path") - val target: Target = Databricks + val target: Target[Unit] = Databricks .build( Config( StorageTarget.Databricks( diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala index 8bbdfc850..e4c0208bc 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala @@ -26,7 +26,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.telemetry.Telemetry import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue} import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} import com.snowplowanalytics.snowplow.rdbloader.db.Columns._ -import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, Control => DbControl, HealthCheck, Manifest, Statement} +import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, Control => DbControl, HealthCheck, Manifest, Statement, Target} import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, NoOperation, Retries} import com.snowplowanalytics.snowplow.rdbloader.dsl.{ Cache, @@ -73,14 +73,18 @@ object Loader { *[_], C ]: Concurrent: BlobStorage: Queue.Consumer: Clock: Iglu: Cache: Logging: Timer: Monitoring: ContextShift: LoadAuthService: JsonPathDiscovery, - C[_]: DAO: MonadThrow: Logging + C[_]: DAO: MonadThrow: Logging, + I ]( config: Config[StorageTarget], control: Control[F], - telemetry: Telemetry[F] + telemetry: Telemetry[F], + target: Target[I] ): F[Unit] = { - val folderMonitoring: Stream[F, Unit] = - FolderMonitoring.run[F, C](config.monitoring.folders, config.readyCheck, config.storage, control.isBusy) + val folderMonitoring: Stream[F, Unit] = for { + initQueryResult <- initQuery[F, C, I](target) + _ <- FolderMonitoring.run[F, C, I](config.monitoring.folders, config.readyCheck, config.storage, control.isBusy, initQueryResult) + } yield () val noOpScheduling: Stream[F, Unit] = NoOperation.run(config.schedules.noOperation, control.makePaused, control.signal.map(_.loading)) @@ -89,7 +93,10 @@ object Loader { val healthCheck = HealthCheck.start[F, C](config.monitoring.healthCheck) val loading: Stream[F, Unit] = - loadStream[F, C](config, control) + for { + initQueryResult <- initQuery[F, C, I](target) + _ <- loadStream[F, C, I](config, control, initQueryResult, target) + } yield () val stateLogging: Stream[F, Unit] = Stream .awakeDelay[F](StateLoggingFrequency) @@ -104,7 +111,7 @@ object Loader { Logging[F].info("Target check is completed") val noOperationPrepare = NoOperation.prepare(config.schedules.noOperation, control.makePaused) *> Logging[F].info("No operation prepare step is completed") - val manifestInit = initRetry(Manifest.initialize[F, C](config.storage)) *> + val manifestInit = initRetry(Manifest.initialize[F, C, I](config.storage, target)) *> Logging[F].info("Manifest initialization is completed") val addLoadTstamp = addLoadTstampColumn[F, C](config.featureFlags.addLoadTstampColumn, config.storage) *> Logging[F].info("Adding load_tstamp column is completed") @@ -135,10 +142,13 @@ object Loader { *[_], C ]: Concurrent: BlobStorage: Queue.Consumer: Iglu: Cache: Logging: Timer: Monitoring: ContextShift: LoadAuthService: JsonPathDiscovery, - C[_]: DAO: MonadThrow: Logging + C[_]: DAO: MonadThrow: Logging, + I ]( config: Config[StorageTarget], - control: Control[F] + control: Control[F], + initQueryResult: I, + target: Target[I] ): Stream[F, Unit] = { val sqsDiscovery: DiscoveryStream[F] = DataDiscovery.discover[F](config, control.incrementMessages) @@ -148,7 +158,7 @@ object Loader { discovery .pauseWhen[F](control.isBusy) - .evalMap(processDiscovery[F, C](config, control)) + .evalMap(processDiscovery[F, C, I](config, control, initQueryResult, target)) } /** @@ -158,10 +168,13 @@ object Loader { */ private def processDiscovery[ F[_]: Transaction[*[_], C]: Concurrent: Iglu: Logging: Timer: Monitoring: ContextShift: LoadAuthService, - C[_]: DAO: MonadThrow: Logging + C[_]: DAO: MonadThrow: Logging, + I ]( config: Config[StorageTarget], - control: Control[F] + control: Control[F], + initQueryResult: I, + target: Target[I] )( discovery: DataDiscovery.WithOrigin ): F[Unit] = { @@ -180,7 +193,7 @@ object Loader { start <- Clock[F].instantNow _ <- discovery.origin.timestamps.min.map(t => Monitoring[F].periodicMetrics.setEarliestKnownUnloadedData(t)).sequence.void loadAuth <- LoadAuthService[F].getLoadAuthMethod(config.storage.eventsLoadAuthMethod) - result <- Load.load[F, C](config, setStageC, control.incrementAttempts, discovery, loadAuth) + result <- Load.load[F, C, I](config, setStageC, control.incrementAttempts, discovery, loadAuth, initQueryResult, target) attempts <- control.getAndResetAttempts _ <- result match { case Right(ingested) => @@ -232,6 +245,10 @@ object Loader { .map(_.value.toLowerCase) .contains(AtomicColumns.ColumnsWithDefault.LoadTstamp.value) + /** Query to get necessary bits from the warehouse during initialization of the application */ + private def initQuery[F[_]: Transaction[*[_], C], C[_]: DAO: MonadThrow, I](target: Target[I]): Stream[F, I] = + Stream.eval(Transaction[F, C].run(target.initQuery)) + /** * Handle a failure during loading. `Load.getTransaction` can fail only in one "expected" way - if * the folder is already loaded everything else in the transaction and outside (migration diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Runner.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Runner.scala index 8179f47e3..3b104f47f 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Runner.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Runner.scala @@ -29,9 +29,9 @@ import com.snowplowanalytics.snowplow.rdbloader.config.CliConfig /** Generic starting point for all loaders */ object Runner { - def run[F[_]: Clock: ConcurrentEffect: ContextShift: Timer: Parallel]( + def run[F[_]: Clock: ConcurrentEffect: ContextShift: Timer: Parallel, I]( argv: List[String], - buildStatements: BuildTarget, + buildStatements: BuildTarget[I], appName: String ): F[ExitCode] = { val result = for { @@ -39,18 +39,18 @@ object Runner { statements <- EitherT.fromEither[F](buildStatements(parsed.config)) application = Environment - .initialize[F]( + .initialize[F, I]( parsed, statements, appName, generated.BuildInfo.version ) - .use { env: Environment[F] => + .use { env: Environment[F, I] => import env._ Logging[F] .info(s"RDB Loader ${generated.BuildInfo.version} has started.") *> - Loader.run[F, ConnectionIO](parsed.config, env.controlF, env.telemetryF).as(ExitCode.Success) + Loader.run[F, ConnectionIO, I](parsed.config, env.controlF, env.telemetryF, env.dbTarget).as(ExitCode.Success) } exitCode <- EitherT.liftF[F, String, ExitCode](application) } yield exitCode diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Manifest.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Manifest.scala index 7e0afccbc..705e14607 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Manifest.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Manifest.scala @@ -29,11 +29,12 @@ object Manifest { "shredded_cardinality" ).map(ColumnName) - def initialize[F[_]: MonadThrow: Logging: Timer, C[_]: DAO: Monad]( - target: StorageTarget + def initialize[F[_]: MonadThrow: Logging: Timer, C[_]: DAO: Monad, I]( + config: StorageTarget, + target: Target[I] )(implicit F: Transaction[F, C] ): F[Unit] = - F.transact(setup[C](target.schema, target)).attempt.flatMap { + F.transact(setup[C, I](config.schema, config, target)).attempt.flatMap { case Right(InitStatus.Created) => Logging[F].info("The manifest table has been created") case Right(InitStatus.Migrated) => @@ -47,8 +48,12 @@ object Manifest { MonadError[F, Throwable].raiseError(new IllegalStateException(error.toString)) } - def setup[F[_]: Monad: DAO](schema: String, target: StorageTarget): F[InitStatus] = target match { - case _: Databricks => create[F].as(InitStatus.Created) + def setup[F[_]: Monad: DAO, I]( + schema: String, + config: StorageTarget, + target: Target[I] + ): F[InitStatus] = config match { + case _: Databricks => create[F, I](target).as(InitStatus.Created) case _ => for { exists <- Control.tableExists[F](Name) @@ -57,14 +62,14 @@ object Manifest { legacy = existingTableColumns.toSet === LegacyColumns.toSet status <- if (legacy) Control.renameTable[F](Name, LegacyName) *> - create[F].as[InitStatus](InitStatus.Migrated) + create[F, I](target).as[InitStatus](InitStatus.Migrated) else Monad[F].pure[InitStatus](InitStatus.NoChanges) } yield status - else create[F].as(InitStatus.Created) + else create[F, I](target).as(InitStatus.Created) _ <- status match { case InitStatus.Migrated | InitStatus.Created => - target match { + config match { case _: Redshift => DAO[F].executeUpdate(Statement.CommentOn(s"$schema.$Name", "0.2.0"), DAO.Purpose.NonLoading) case _ => Monad[F].unit } @@ -81,8 +86,8 @@ object Manifest { DAO[F].executeQueryOption[Entry](Statement.ManifestGet(base))(Entry.entryRead) /** Create manifest table */ - def create[F[_]: DAO: Functor]: F[Unit] = - DAO[F].executeUpdate(DAO[F].target.getManifest, DAO.Purpose.NonLoading).void + def create[F[_]: DAO: Functor, I](target: Target[I]): F[Unit] = + DAO[F].executeUpdate(target.getManifest, DAO.Purpose.NonLoading).void case class Entry(ingestion: Instant, meta: LoaderMessage.ManifestItem) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala index 54ff64588..5bc745bc1 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala @@ -147,7 +147,10 @@ object Migration { } /** Inspect DB state and create a [[Migration]] object that contains all necessary actions */ - def build[F[_]: Transaction[*[_], C]: MonadThrow: Iglu, C[_]: MonadThrow: Logging: DAO](discovery: DataDiscovery): F[Migration[C]] = { + def build[F[_]: Transaction[*[_], C]: MonadThrow: Iglu, C[_]: MonadThrow: Logging: DAO, I]( + discovery: DataDiscovery, + target: Target[I] + ): F[Migration[C]] = { val descriptions: LoaderAction[F, List[Description]] = discovery.shreddedTypes.filterNot(_.isAtomic).traverse { case s: ShreddedType.Tabular => @@ -165,7 +168,7 @@ object Migration { // Duplicate schemas cause migration vector to double failing the second migration. Therefore deduplication // with toSet.toList schemaList.toSet.toList - .traverseFilter(buildBlock[C]) + .traverseFilter(buildBlock[C, I](_, target)) .flatMap(blocks => Migration.fromBlocks[C](blocks)) case Left(error) => MonadThrow[C].raiseError[Migration[C]](error) @@ -178,8 +181,7 @@ object Migration { def empty[F[_]: Applicative]: Migration[F] = Migration[F](Nil, Applicative[F].unit) - def buildBlock[F[_]: MonadThrow: DAO](description: Description): F[Option[Block]] = { - val target = DAO[F].target + def buildBlock[F[_]: MonadThrow: DAO, I](description: Description, target: Target[I]): F[Option[Block]] = description match { case Description.Table(schemas) => val tableName = StringUtils.getTableName(schemas.latest) @@ -203,10 +205,9 @@ object Migration { case Description.NoMigration => Monad[F].pure(none[Block]) } - } - def migrateTable( - target: Target, + def migrateTable[I]( + target: Target[I], current: SchemaKey, columns: List[ColumnName], schemaList: SchemaList diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala index 36b9be4e4..73064ee0a 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala @@ -51,16 +51,21 @@ object Statement { case object CreateAlertingTempTable extends Statement case object DropAlertingTempTable extends Statement case object FoldersMinusManifest extends Statement - case class FoldersCopy(source: BlobStorage.Folder, loadAuthMethod: LoadAuthMethod) extends Statement + case class FoldersCopy[T]( + source: BlobStorage.Folder, + loadAuthMethod: LoadAuthMethod, + initQueryResult: T + ) extends Statement // Loading - case class EventsCopy( + case class EventsCopy[T]( path: BlobStorage.Folder, compression: Compression, columnsToCopy: ColumnsToCopy, columnsToSkip: ColumnsToSkip, typesInfo: TypesInfo, - loadAuthMethod: LoadAuthMethod + loadAuthMethod: LoadAuthMethod, + initQueryResult: T ) extends Statement with Loading { def table: String = EventsTable.MainName @@ -113,4 +118,6 @@ object Statement { // Optimize (housekeeping i.e. vacuum in redshift, optimize in databricks) case object VacuumManifest extends Statement case object VacuumEvents extends Statement + + case class StagePath(stage: String) extends Statement } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala index 9eed4c18b..7613e98fd 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala @@ -12,12 +12,15 @@ */ package com.snowplowanalytics.snowplow.rdbloader.db +import cats.Monad + import com.snowplowanalytics.iglu.schemaddl.migrations.{Migration => SchemaMigration, SchemaList} import com.snowplowanalytics.snowplow.rdbloader.LoadStatements import com.snowplowanalytics.snowplow.rdbloader.db.Columns.EventTableColumns import com.snowplowanalytics.snowplow.rdbloader.db.Migration.Block import com.snowplowanalytics.snowplow.rdbloader.cloud.LoadAuthService.LoadAuthMethod import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} +import com.snowplowanalytics.snowplow.rdbloader.dsl.DAO import doobie.Fragment /** @@ -26,7 +29,7 @@ import doobie.Fragment * or transform agnostic `Statement` into DB-specific SQL dialect, it uses the `Target` which is * typically tightly coupled with `DAO` */ -trait Target { +trait Target[I] { /** Transform DB-agnostic, generic `Statement` into a concrete SQL statement */ def toFragment(statement: Statement): Fragment @@ -42,7 +45,8 @@ trait Target { def getLoadStatements( discovery: DataDiscovery, eventTableColumns: EventTableColumns, - loadAuthMethod: LoadAuthMethod + loadAuthMethod: LoadAuthMethod, + initQueryResult: I ): LoadStatements /** Get DDL of a manifest table */ @@ -54,6 +58,9 @@ trait Target { /** Create a table with columns dervived from list of Iglu schemas */ def createTable(schemas: SchemaList): Block + /** Query to get necessary bits from the warehouse during initialization of the application */ + def initQuery[F[_]: DAO: Monad]: F[I] + /** * Add a new column into `events`, i.e. extend a wide row. Unlike `updateTable` it always operates * on `events` table diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/DAO.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/DAO.scala index 90d212e88..6fb42c4b8 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/DAO.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/DAO.scala @@ -38,9 +38,6 @@ trait DAO[C[_]] { /** Execute query and parse results into 0 or one `A` */ def executeQueryOption[A](query: Statement)(implicit A: Read[A]): C[Option[A]] - - /** Get the DB interpreter */ - def target: Target } object DAO { @@ -53,7 +50,7 @@ object DAO { def apply[F[_]](implicit ev: DAO[F]): DAO[F] = ev - def connectionIO(dbTarget: Target, timeouts: Config.Timeouts): DAO[ConnectionIO] = new DAO[ConnectionIO] { + def connectionIO[I](dbTarget: Target[I], timeouts: Config.Timeouts): DAO[ConnectionIO] = new DAO[ConnectionIO] { /** Execute single SQL statement (against target in interpreter) */ def executeUpdate(sql: Statement, purpose: Purpose): ConnectionIO[Int] = { @@ -81,8 +78,5 @@ object DAO { dbTarget.toFragment(query).execWith { HPS.setQueryTimeout(timeouts.nonLoading.toSeconds.toInt).flatMap(_ => HPS.executeQuery(HRS.getOption)) } - - def target: Target = - dbTarget } } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala index 0281b1177..b6e62800a 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Environment.scala @@ -41,7 +41,7 @@ import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics._ * Container for most of interepreters to be used in Main JDBC will be instantiated only when * necessary, and as a `Reousrce` */ -class Environment[F[_]]( +class Environment[F[_], I]( cache: Cache[F], logging: Logging[F], monitoring: Monitoring[F], @@ -51,7 +51,7 @@ class Environment[F[_]]( loadAuthService: LoadAuthService[F], jsonPathDiscovery: JsonPathDiscovery[F], transaction: Transaction[F, ConnectionIO], - target: Target, + target: Target[I], timeouts: Config.Timeouts, control: Control[F], telemetry: Telemetry[F] @@ -70,6 +70,7 @@ class Environment[F[_]]( implicit val loggingC: Logging[ConnectionIO] = logging.mapK(transaction.arrowBack) val controlF: Control[F] = control val telemetryF: Telemetry[F] = telemetry + val dbTarget: Target[I] = target } object Environment { @@ -84,12 +85,12 @@ object Environment { secretStore: SecretStore[F] ) - def initialize[F[_]: Clock: ConcurrentEffect: ContextShift: Timer: Parallel]( + def initialize[F[_]: Clock: ConcurrentEffect: ContextShift: Timer: Parallel, I]( cli: CliConfig, - statementer: Target, + statementer: Target[I], appName: String, appVersion: String - ): Resource[F, Environment[F]] = + ): Resource[F, Environment[F, I]] = for { blocker <- Blocker[F] implicit0(logger: Logger[F]) = Slf4jLogger.getLogger[F] @@ -121,7 +122,7 @@ object Environment { getRegionForTelemetry(cli.config), getCloudForTelemetry(cli.config) ) - } yield new Environment[F]( + } yield new Environment[F, I]( cache, logging, monitoring, diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala index ba51a23f6..9c146437b 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala @@ -163,19 +163,28 @@ object FolderMonitoring { * `shredding_complete.json` and turned into corresponding `AlertPayload` * @param loadFrom * list shredded folders + * @param readyCheck + * config for retry logic + * @param storageTarget + * target storage config + * @param loadAuthMethod + * auth method used for load operation + * @param initQueryResult + * results of the queries sent to warehouse when application is initialized * @return * potentially empty list of alerts */ - def check[F[_]: MonadThrow: BlobStorage: Transaction[*[_], C]: Timer: Logging, C[_]: DAO: Monad]( + def check[F[_]: MonadThrow: BlobStorage: Transaction[*[_], C]: Timer: Logging, C[_]: DAO: Monad, I]( loadFrom: BlobStorage.Folder, readyCheck: Config.Retries, storageTarget: StorageTarget, - loadAuthMethod: LoadAuthMethod + loadAuthMethod: LoadAuthMethod, + initQueryResult: I ): F[List[AlertPayload]] = { val getBatches = for { _ <- DAO[C].executeUpdate(DropAlertingTempTable, DAO.Purpose.NonLoading) _ <- DAO[C].executeUpdate(CreateAlertingTempTable, DAO.Purpose.NonLoading) - _ <- DAO[C].executeUpdate(FoldersCopy(loadFrom, loadAuthMethod), DAO.Purpose.NonLoading) + _ <- DAO[C].executeUpdate(FoldersCopy(loadFrom, loadAuthMethod, initQueryResult), DAO.Purpose.NonLoading) onlyS3Batches <- DAO[C].executeQueryList[BlobStorage.Folder](FoldersMinusManifest) } yield onlyS3Batches @@ -214,16 +223,18 @@ object FolderMonitoring { */ def run[ F[_]: Concurrent: Timer: BlobStorage: Transaction[*[_], C]: Logging: Monitoring: MonadThrow: ContextShift: LoadAuthService, - C[_]: DAO: Monad + C[_]: DAO: Monad, + I ]( foldersCheck: Option[Config.Folders], readyCheck: Config.Retries, storageTarget: StorageTarget, - isBusy: Stream[F, Boolean] + isBusy: Stream[F, Boolean], + initQueryResult: I ): Stream[F, Unit] = foldersCheck match { case Some(folders) => - stream[F, C](folders, readyCheck, storageTarget, isBusy) + stream[F, C, I](folders, readyCheck, storageTarget, isBusy, initQueryResult) case None => Stream.eval[F, Unit](Logging[F].info("Configuration for monitoring.folders hasn't been provided - monitoring is disabled")) } @@ -232,21 +243,28 @@ object FolderMonitoring { * Same as [[run]], but without parsing preparation The stream ignores a first failure just * printing an error, hoping it's transient, but second failure in row makes the whole stream to * crash + * * @param folders * configuration for folders monitoring * @param readyCheck * configuration for target ready check + * @param storageTarget + * target storage config * @param isBusy * discrete stream signalling when folders monitoring should not work + * @param initQueryResult + * results of the queries sent to warehouse when application is initialized */ def stream[ F[_]: Transaction[*[_], C]: Concurrent: Timer: BlobStorage: Logging: Monitoring: MonadThrow: ContextShift: LoadAuthService, - C[_]: DAO: Monad + C[_]: DAO: Monad, + I ]( folders: Config.Folders, readyCheck: Config.Retries, storageTarget: StorageTarget, - isBusy: Stream[F, Boolean] + isBusy: Stream[F, Boolean], + initQueryResult: I ): Stream[F, Unit] = Stream.eval((Semaphore[F](1), Ref.of(0)).tupled).flatMap { case (lock, failed) => getOutputKeys[F](folders) @@ -259,7 +277,7 @@ object FolderMonitoring { sinkFolders[F](folders.since, folders.until, folders.transformerOutput, outputFolder).ifM( for { loadAuth <- LoadAuthService[F].getLoadAuthMethod(storageTarget.foldersLoadAuthMethod) - alerts <- check[F, C](outputFolder, readyCheck, storageTarget, loadAuth) + alerts <- check[F, C, I](outputFolder, readyCheck, storageTarget, loadAuth, initQueryResult) _ <- alerts.traverse_ { payload => val warn = payload.base match { case Some(folder) => Logging[F].warning(s"${payload.message} $folder") diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala index 1708c5342..0ce522872 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala @@ -23,7 +23,7 @@ import cats.effect.{Clock, Timer} import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} -import com.snowplowanalytics.snowplow.rdbloader.db.{Control, Manifest, Migration} +import com.snowplowanalytics.snowplow.rdbloader.db.{Control, Manifest, Migration, Target} import com.snowplowanalytics.snowplow.rdbloader.cloud.LoadAuthService.LoadAuthMethod import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Iglu, Logging, Monitoring, Transaction} @@ -70,24 +70,34 @@ object Load { * RDB Loader app configuration * @param setStage * function setting a stage in global state + * @param incrementAttempt + * effect that increases the number of load attempts * @param discovery * discovered folder to load + * @param loadAuthMethod + * auth method used for load operation + * @param initQueryResult + * results of the queries sent to warehouse when application is initialized + * @param target + * storage target object * @return * either alert payload in case of duplicate event or ingestion timestamp in case of success */ - def load[F[_]: MonadThrow: Logging: Timer: Iglu: Transaction[*[_], C], C[_]: MonadThrow: Logging: DAO]( + def load[F[_]: MonadThrow: Logging: Timer: Iglu: Transaction[*[_], C], C[_]: MonadThrow: Logging: DAO, I]( config: Config[StorageTarget], setStage: Stage => C[Unit], incrementAttempt: F[Unit], discovery: DataDiscovery.WithOrigin, - loadAuthMethod: LoadAuthMethod + loadAuthMethod: LoadAuthMethod, + initQueryResult: I, + target: Target[I] ): F[Either[AlertPayload, Option[Instant]]] = for { _ <- TargetCheck.blockUntilReady[F, C](config.readyCheck, config.storage) - migrations <- Migration.build[F, C](discovery.discovery) + migrations <- Migration.build[F, C, I](discovery.discovery, target) _ <- Transaction[F, C].run(setStage(Stage.MigrationPre)) _ <- migrations.preTransaction.traverse_(Transaction[F, C].run_) - transaction = getTransaction[C](setStage, discovery, loadAuthMethod)(migrations.inTransaction) + transaction = getTransaction[C, I](setStage, discovery, loadAuthMethod, initQueryResult, target)(migrations.inTransaction) result <- Retry.retryLoad(config.retries, incrementAttempt, Transaction[F, C].transact(transaction)) } yield result @@ -95,20 +105,29 @@ object Load { * Run a transaction with all load statements and with in-transaction migrations if necessary and * acknowledge the discovery message after transaction is successful. If the main transaction * fails it will be retried several times by a caller + * * @param setStage * function to report current loading status to global state * @param discovery * metadata about batch + * @param loadAuthMethod + * auth method used for load operation + * @param initQueryResult + * results of the queries sent to warehouse when application is initialized + * @param target + * storage target object * @param inTransactionMigrations * sequence of migration actions such as ALTER TABLE that have to run before the batch is loaded * @return * either alert payload in case of an existing folder or ingestion timestamp of the current * folder */ - def getTransaction[F[_]: Logging: Monad: DAO]( + def getTransaction[F[_]: Logging: Monad: DAO, I]( setStage: Stage => F[Unit], discovery: DataDiscovery.WithOrigin, - loadAuthMethod: LoadAuthMethod + loadAuthMethod: LoadAuthMethod, + initQueryResult: I, + target: Target[I] )( inTransactionMigrations: F[Unit] ): F[Either[AlertPayload, Option[Instant]]] = @@ -128,7 +147,7 @@ object Load { Logging[F].info(s"Loading transaction for ${discovery.origin.base} has started") *> setStage(Stage.MigrationIn) *> inTransactionMigrations *> - run[F](setLoading, discovery.discovery, loadAuthMethod) *> + run[F, I](setLoading, discovery.discovery, loadAuthMethod, initQueryResult, target) *> setStage(Stage.Committing) *> Manifest.add[F](discovery.origin.toManifestItem) *> Manifest @@ -140,20 +159,30 @@ object Load { /** * Run loading actions for atomic and shredded data * + * @param setLoading + * function to report that application is in loading state currently * @param discovery * batch discovered from message queue + * @param loadAuthMethod + * auth method used for load operation + * @param initQueryResult + * results of the queries sent to warehouse when application is initialized + * @param target + * storage target object * @return * block of VACUUM and ANALYZE statements to execute them out of a main transaction */ - def run[F[_]: Monad: Logging: DAO]( + def run[F[_]: Monad: Logging: DAO, I]( setLoading: String => F[Unit], discovery: DataDiscovery, - loadAuthMethod: LoadAuthMethod + loadAuthMethod: LoadAuthMethod, + initQueryResult: I, + target: Target[I] ): F[Unit] = for { _ <- Logging[F].info(s"Loading ${discovery.base}") - existingEventTableColumns <- if (DAO[F].target.requiresEventsColumns) Control.getColumns[F](EventsTable.MainName) else Nil.pure[F] - _ <- DAO[F].target.getLoadStatements(discovery, existingEventTableColumns, loadAuthMethod).traverse_ { statement => + existingEventTableColumns <- if (target.requiresEventsColumns) Control.getColumns[F](EventsTable.MainName) else Nil.pure[F] + _ <- target.getLoadStatements(discovery, existingEventTableColumns, loadAuthMethod, initQueryResult).traverse_ { statement => Logging[F].info(statement.title) *> setLoading(statement.table) *> DAO[F].executeUpdate(statement, DAO.Purpose.Loading).void diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala index a917a5d90..2fca9ad5d 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala @@ -43,7 +43,7 @@ package object rdbloader { type LoadStatements = NonEmptyList[Statement.Loading] /** A function to build a specific `Target` or error in case invalid config is passed */ - type BuildTarget = Config[StorageTarget] => Either[String, Target] + type BuildTarget[I] = Config[StorageTarget] => Either[String, Target[I]] /** Loading effect, producing value of type `A` with possible `LoaderError` */ type LoaderAction[F[_], A] = EitherT[F, LoaderError, A] diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala index 812c04c7b..3374f29a2 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala @@ -40,7 +40,7 @@ class FolderMonitoringSpec extends Specification { List( PureTransaction.CommitMessage, TestState.LogEntry.Sql(Statement.FoldersMinusManifest), - TestState.LogEntry.Sql(Statement.FoldersCopy(BlobStorage.Folder.coerce("s3://bucket/shredded/"), LoadAuthMethod.NoCreds)), + TestState.LogEntry.Sql(Statement.FoldersCopy(BlobStorage.Folder.coerce("s3://bucket/shredded/"), LoadAuthMethod.NoCreds, ())), TestState.LogEntry.Sql(Statement.CreateAlertingTempTable), TestState.LogEntry.Sql(Statement.DropAlertingTempTable), PureTransaction.StartMessage, @@ -60,7 +60,7 @@ class FolderMonitoringSpec extends Specification { ) val (state, result) = - FolderMonitoring.check[Pure, Pure](loadFrom, exampleReadyCheckConfig, exampleDatabricks, LoadAuthMethod.NoCreds).run + FolderMonitoring.check[Pure, Pure, Unit](loadFrom, exampleReadyCheckConfig, exampleDatabricks, LoadAuthMethod.NoCreds, ()).run state must beEqualTo(expectedState) result must beRight.like { @@ -81,7 +81,7 @@ class FolderMonitoringSpec extends Specification { List( PureTransaction.CommitMessage, TestState.LogEntry.Sql(Statement.FoldersMinusManifest), - TestState.LogEntry.Sql(Statement.FoldersCopy(BlobStorage.Folder.coerce("s3://bucket/shredded/"), LoadAuthMethod.NoCreds)), + TestState.LogEntry.Sql(Statement.FoldersCopy(BlobStorage.Folder.coerce("s3://bucket/shredded/"), LoadAuthMethod.NoCreds, ())), TestState.LogEntry.Sql(Statement.CreateAlertingTempTable), TestState.LogEntry.Sql(Statement.DropAlertingTempTable), PureTransaction.StartMessage, @@ -101,7 +101,7 @@ class FolderMonitoringSpec extends Specification { ) val (state, result) = - FolderMonitoring.check[Pure, Pure](loadFrom, exampleReadyCheckConfig, exampleDatabricks, LoadAuthMethod.NoCreds).run + FolderMonitoring.check[Pure, Pure, Unit](loadFrom, exampleReadyCheckConfig, exampleDatabricks, LoadAuthMethod.NoCreds, ()).run state must beEqualTo(expectedState) result must beRight.like { diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala index 288a186b1..1a5933e72 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala @@ -63,7 +63,8 @@ class LoadSpec extends Specification { ColumnsToCopy(List.empty), ColumnsToSkip(List.empty), TypesInfo.Shredded(List.empty), - LoadAuthMethod.NoCreds + LoadAuthMethod.NoCreds, + () ) ), LogEntry.Sql(Statement.ShreddedCopy(info, Compression.Gzip)), @@ -73,12 +74,14 @@ class LoadSpec extends Specification { ) val result = Load - .load[Pure, Pure]( + .load[Pure, Pure, Unit]( SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin, - LoadAuthMethod.NoCreds + LoadAuthMethod.NoCreds, + (), + PureDAO.DummyTarget ) .runS @@ -104,12 +107,14 @@ class LoadSpec extends Specification { ) val result = Load - .load[Pure, Pure]( + .load[Pure, Pure, Unit]( SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin, - LoadAuthMethod.NoCreds + LoadAuthMethod.NoCreds, + (), + PureDAO.DummyTarget ) .runS @@ -142,7 +147,8 @@ class LoadSpec extends Specification { ColumnsToCopy(List.empty), ColumnsToSkip(List.empty), TypesInfo.Shredded(List.empty), - LoadAuthMethod.NoCreds + LoadAuthMethod.NoCreds, + () ) ), LogEntry.Sql(Statement.ShreddedCopy(info, Compression.Gzip)), @@ -157,7 +163,8 @@ class LoadSpec extends Specification { ColumnsToCopy(List.empty), ColumnsToSkip(List.empty), TypesInfo.Shredded(List.empty), - LoadAuthMethod.NoCreds + LoadAuthMethod.NoCreds, + () ) ), LogEntry.Sql(Statement.ShreddedCopy(info, Compression.Gzip)), @@ -166,12 +173,14 @@ class LoadSpec extends Specification { PureTransaction.CommitMessage ) val result = Load - .load[Pure, Pure]( + .load[Pure, Pure, Unit]( SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin, - LoadAuthMethod.NoCreds + LoadAuthMethod.NoCreds, + (), + PureDAO.DummyTarget ) .runS @@ -206,12 +215,14 @@ class LoadSpec extends Specification { // to throw an ad-hoc exception within a transaction ) val result = Load - .load[Pure, Pure]( + .load[Pure, Pure, Unit]( SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin, - LoadAuthMethod.NoCreds + LoadAuthMethod.NoCreds, + (), + PureDAO.DummyTarget ) .runS diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala index 28e69649b..d8802cd61 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala @@ -12,13 +12,17 @@ */ package com.snowplowanalytics.snowplow.rdbloader.test +import cats.Monad import cats.data.NonEmptyList import cats.implicits._ + import doobie.{Fragment, Read} + import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.iglu.schemaddl.StringUtils import com.snowplowanalytics.iglu.schemaddl.migrations.{FlatSchema, Migration => SchemaMigration, SchemaList} import com.snowplowanalytics.iglu.schemaddl.redshift.generators.DdlGenerator + import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo import com.snowplowanalytics.snowplow.rdbloader.{LoadStatements, LoaderError} import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression @@ -68,7 +72,7 @@ object PureDAO { val init: PureDAO = custom(getResult) - def interpreter(results: PureDAO, tgt: Target = DummyTarget): DAO[Pure] = new DAO[Pure] { + def interpreter(results: PureDAO): DAO[Pure] = new DAO[Pure] { def executeUpdate(sql: Statement, purpose: DAO.Purpose): Pure[Int] = results.executeUpdate(sql) @@ -80,18 +84,17 @@ object PureDAO { def executeQueryOption[A](query: Statement)(implicit A: Read[A]): Pure[Option[A]] = results.executeQuery.asInstanceOf[Statement => Pure[Option[A]]](query) - - def target: Target = tgt } - val DummyTarget = new Target { + val DummyTarget = new Target[Unit] { def toFragment(statement: Statement): Fragment = Fragment.const0(statement.toString) def getLoadStatements( discovery: DataDiscovery, eventTableColumns: EventTableColumns, - loadAuthMethod: LoadAuthMethod + loadAuthMethod: LoadAuthMethod, + i: Unit ): LoadStatements = NonEmptyList( Statement.EventsCopy( @@ -100,13 +103,16 @@ object PureDAO { ColumnsToCopy(List.empty), ColumnsToSkip(List.empty), TypesInfo.Shredded(List.empty), - loadAuthMethod + loadAuthMethod, + i ), discovery.shreddedTypes.map { shredded => Statement.ShreddedCopy(shredded, Compression.Gzip) } ) + def initQuery[F[_]: DAO: Monad]: F[Unit] = Monad[F].unit + def getManifest: Statement = Statement.CreateTable(Fragment.const0("CREATE manifest")) diff --git a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Main.scala b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Main.scala index 3f1fc7027..b7bbb9719 100644 --- a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Main.scala +++ b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Main.scala @@ -19,7 +19,7 @@ import com.snowplowanalytics.snowplow.rdbloader.Runner object Main extends IOApp { def run(args: List[String]): IO[ExitCode] = - Runner.run[IO]( + Runner.run[IO, Unit]( args, Redshift.build, "rdb-loader-redshift" diff --git a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala index 2a5d7227d..7efbacafc 100644 --- a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala +++ b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala @@ -14,6 +14,7 @@ package com.snowplowanalytics.snowplow.loader.redshift import java.sql.Timestamp +import cats.Monad import cats.data.NonEmptyList import doobie.Fragment @@ -38,6 +39,7 @@ import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Entity, Ite import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, Manifest, Statement, Target} import com.snowplowanalytics.snowplow.rdbloader.cloud.LoadAuthService.LoadAuthMethod import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} +import com.snowplowanalytics.snowplow.rdbloader.dsl.DAO import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable object Redshift { @@ -46,14 +48,14 @@ object Redshift { val AlertingTempTableName = "rdb_folder_monitoring" - def build(config: Config[StorageTarget]): Either[String, Target] = { + def build(config: Config[StorageTarget]): Either[String, Target[Unit]] = { (config.cloud, config.storage) match { case (c: Config.Cloud.AWS, storage: StorageTarget.Redshift) => val region = c.region val roleArn = storage.roleArn val schema = storage.schema val maxError = storage.maxError - val result = new Target { + val result = new Target[Unit] { override val requiresEventsColumns: Boolean = false @@ -82,7 +84,8 @@ object Redshift { override def getLoadStatements( discovery: DataDiscovery, eventTableColumns: EventTableColumns, - loadAuthMethod: LoadAuthMethod + loadAuthMethod: LoadAuthMethod, + i: Unit ): LoadStatements = { val shreddedStatements = discovery.shreddedTypes .filterNot(_.isAtomic) @@ -94,11 +97,14 @@ object Redshift { ColumnsToCopy(AtomicColumns.Columns), ColumnsToSkip.none, discovery.typesInfo, - loadAuthMethod + loadAuthMethod, + i ) NonEmptyList(atomic, shreddedStatements) } + override def initQuery[F[_]: DAO: Monad]: F[Unit] = Monad[F].unit + override def createTable(schemas: SchemaList): Block = { val subschemas = FlatSchema.extractProperties(schemas) val tableName = StringUtils.getTableName(schemas.latest) @@ -124,12 +130,12 @@ object Redshift { val frTableName = Fragment.const(AlertingTempTableName) val frManifest = Fragment.const(s"${schema}.manifest") sql"SELECT run_id FROM $frTableName MINUS SELECT base FROM $frManifest" - case Statement.FoldersCopy(source, _) => + case Statement.FoldersCopy(source, _, _) => val frTableName = Fragment.const(AlertingTempTableName) val frRoleArn = Fragment.const0(s"aws_iam_role=$roleArn") val frPath = Fragment.const0(source) sql"COPY $frTableName FROM '$frPath' CREDENTIALS '$frRoleArn' DELIMITER '$EventFieldSeparator'" - case Statement.EventsCopy(path, compression, columnsToCopy, _, _, _) => + case Statement.EventsCopy(path, compression, columnsToCopy, _, _, _, _) => // For some reasons Redshift JDBC doesn't handle interpolation in COPY statements val frTableName = Fragment.const(EventsTable.withSchema(schema)) val frPath = Fragment.const0(Common.entityPathFull(path, Common.AtomicType)) @@ -258,6 +264,8 @@ object Redshift { throw new IllegalStateException("Redshift Loader does not use vacuum events table statement") case Statement.VacuumManifest => throw new IllegalStateException("Redshift Loader does not use vacuum manifest statement") + case Statement.StagePath(_) => + throw new IllegalStateException("Redshift Loader does not use StagePath statement") } private def qualify(tableName: String): String = diff --git a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala index 38260261a..20d22025d 100644 --- a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala +++ b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala @@ -71,6 +71,6 @@ class RedshiftSpec extends Specification { } object RedshiftSpec { - val redshift: Target = + val redshift: Target[Unit] = Redshift.build(validConfig).getOrElse(throw new RuntimeException("Invalid config")) } diff --git a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/db/MigrationSpec.scala b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/db/MigrationSpec.scala index 5e8471e3b..cb35d12d4 100644 --- a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/db/MigrationSpec.scala +++ b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/db/MigrationSpec.scala @@ -96,7 +96,7 @@ class MigrationSpec extends Specification { LogEntry.Message("Table created") ) - val (state, value) = Migration.build[Pure, Pure](input).run + val (state, value) = Migration.build[Pure, Pure, Unit](input, PureDAO.DummyTarget).run state.getLog must beEqualTo(expected) value must beRight.like { case Migration(preTransaction, inTransaction) => @@ -164,7 +164,7 @@ class MigrationSpec extends Specification { LogEntry.Message("Table created") ) - val (state, value) = Migration.build[Pure, Pure](input).run + val (state, value) = Migration.build[Pure, Pure, Unit](input, PureDAO.DummyTarget).run state.getLog must beEqualTo(expected) value must beRight.like { case Migration(preTransaction, inTransaction) => preTransaction must beEmpty diff --git a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Main.scala b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Main.scala index 01c08c261..8930ace33 100644 --- a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Main.scala +++ b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Main.scala @@ -18,7 +18,7 @@ import com.snowplowanalytics.snowplow.rdbloader.Runner object Main extends IOApp { def run(args: List[String]): IO[ExitCode] = - Runner.run[IO]( + Runner.run[IO, Snowflake.InitQueryResult]( args, Snowflake.build, "rdb-loader-snowflake" diff --git a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala index 732954403..ae937a470 100644 --- a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala +++ b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala @@ -12,12 +12,15 @@ */ package com.snowplowanalytics.snowplow.loader.snowflake +import cats.implicits._ +import cats.Monad import cats.data.NonEmptyList import doobie.Fragment import doobie.implicits._ import io.circe.syntax._ +import io.circe.parser.parse import com.snowplowanalytics.iglu.core.SchemaKey @@ -39,6 +42,7 @@ import com.snowplowanalytics.snowplow.rdbloader.db.{Manifest, Statement, Target} import com.snowplowanalytics.snowplow.rdbloader.cloud.LoadAuthService.LoadAuthMethod import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable +import com.snowplowanalytics.snowplow.rdbloader.dsl.DAO object Snowflake { @@ -47,11 +51,11 @@ object Snowflake { val AlertingTempTableName = "rdb_folder_monitoring" val TempTableColumn = "enriched_data" - def build(config: Config[StorageTarget]): Either[String, Target] = { + def build(config: Config[StorageTarget]): Either[String, Target[InitQueryResult]] = { config.storage match { case tgt: StorageTarget.Snowflake => val schema = tgt.schema - val result = new Target { + val result = new Target[InitQueryResult] { override val requiresEventsColumns: Boolean = false @@ -73,7 +77,8 @@ object Snowflake { override def getLoadStatements( discovery: DataDiscovery, eventTableColumns: EventTableColumns, - loadAuthMethod: LoadAuthMethod + loadAuthMethod: LoadAuthMethod, + initQueryResult: InitQueryResult ): LoadStatements = { val columnsToCopy = ColumnsToCopy.fromDiscoveredData(discovery) loadAuthMethod match { @@ -85,7 +90,8 @@ object Snowflake { columnsToCopy, ColumnsToSkip.none, discovery.typesInfo, - loadAuthMethod + loadAuthMethod, + initQueryResult ) ) case c: LoadAuthMethod.TempCreds => @@ -103,6 +109,14 @@ object Snowflake { } } + override def initQuery[F[_]: DAO: Monad]: F[InitQueryResult] = + for { + eventStagePath <- + DAO[F].executeQueryList[StageDescRow](Statement.StagePath(tgt.transformedStage.get.name)).map(StageDescRow.path) + folderMonitoringStagePath <- + DAO[F].executeQueryList[StageDescRow](Statement.StagePath(tgt.folderMonitoringStage.get.name)).map(StageDescRow.path) + } yield InitQueryResult(eventStagePath, folderMonitoringStagePath) + // Technically, Snowflake Loader cannot create new tables override def createTable(schemas: SchemaList): Block = { val entity = Entity.Table(schema, schemas.latest.schemaKey) @@ -127,7 +141,7 @@ object Snowflake { val frTableName = Fragment.const(qualify(AlertingTempTableName)) val frManifest = Fragment.const(qualify(Manifest.Name)) sql"SELECT run_id FROM $frTableName MINUS SELECT base FROM $frManifest" - case Statement.FoldersCopy(source, loadAuthMethod) => + case Statement.FoldersCopy(source, loadAuthMethod, initQueryResult: InitQueryResult) => val frTableName = Fragment.const(qualify(AlertingTempTableName)) val frPath = loadAuthMethod match { case LoadAuthMethod.NoCreds => @@ -135,7 +149,7 @@ object Snowflake { val stage = tgt.folderMonitoringStage.getOrElse( throw new IllegalStateException("Folder Monitoring is launched without monitoring stage being provided") ) - val afterStage = findPathAfterStage(stage, source) + val afterStage = findPathAfterStage(stage, initQueryResult.folderMonitoringStagePath, source) Fragment.const0(s"@${qualify(stage.name)}/$afterStage") case _: LoadAuthMethod.TempCreds => Fragment.const0(source) @@ -145,12 +159,15 @@ object Snowflake { |FROM $frPath $frCredentials |FILE_FORMAT = (TYPE = CSV)""".stripMargin - case Statement.EventsCopy(path, _, columns, _, typesInfo, _) => + case Statement.FoldersCopy(_, _, _) => + throw new IllegalStateException("Init query result has wrong format in FoldersCopy") + + case Statement.EventsCopy(path, _, columns, _, typesInfo, _, initQueryResult: InitQueryResult) => // This is validated on config decoding stage val stage = tgt.transformedStage.getOrElse( throw new IllegalStateException("Transformed stage is tried to be used without being provided") ) - val afterStage = findPathAfterStage(stage, path) + val afterStage = findPathAfterStage(stage, initQueryResult.transformedStagePath, path) val frPath = Fragment.const0(s"@${qualify(stage.name)}/$afterStage/output=good/") val frCopy = Fragment.const0(s"${qualify(EventsTable.MainName)}(${columnsForCopy(columns)})") val frSelectColumns = Fragment.const0(columnsForSelect(columns)) @@ -163,6 +180,12 @@ object Snowflake { |$frFileFormat |$frOnError""".stripMargin + case Statement.EventsCopy(_, _, _, _, _, _, _) => + throw new IllegalStateException("Init query result has wrong format in EventsCopy") + + case Statement.StagePath(stage) => + Fragment.const0(s"DESC STAGE ${qualify(stage)}") + case Statement.CreateTempEventTable(table) => val frTableName = Fragment.const(qualify(table)) val frTableColumn = Fragment.const(TempTableColumn) @@ -322,13 +345,48 @@ object Snowflake { ) } - private def findPathAfterStage(stage: StorageTarget.Snowflake.Stage, pathToLoad: BlobStorage.Folder): String = - stage.location match { + private def findPathAfterStage( + stage: StorageTarget.Snowflake.Stage, + queriedStagePath: Option[BlobStorage.Folder], + pathToLoad: BlobStorage.Folder + ): String = { + val stagePath = stage.location.orElse(queriedStagePath) + stagePath match { case Some(loc) => pathToLoad.diff(loc) match { case Some(diff) => diff - case None => throw new IllegalStateException(s"The stage's path and the path to load don't match: $pathToLoad") + case None => throw new IllegalStateException(s"The stage path and the path to load don't match: $pathToLoad") } - case None => pathToLoad.folderName + case None => throw new IllegalStateException(s"The stage path cannot be empty") } + } + + /** + * Contains results of the queries sent to warehouse when application is initialized. The queries + * sent with Snowflake Loader is 'DESC STAGE'. They are sent to find out paths of transformed and + * folder monitoring stages. And during the load operation, stage path is compared with the load + * path to make sure the data can be load with given stage. + */ + case class InitQueryResult(transformedStagePath: Option[BlobStorage.Folder], folderMonitoringStagePath: Option[BlobStorage.Folder]) + + /** + * Represents one row of DESC STAGE result. Every row contains info about different property of + * the stage. + */ + case class StageDescRow( + parent_property: String, + property: String, + property_type: String, + property_value: String, + property_default: String + ) + + object StageDescRow { + def path(l: List[StageDescRow]): Option[BlobStorage.Folder] = + for { + loc <- l.find(_.parent_property == "STAGE_LOCATION") + l <- parse(loc.property_value).flatMap(_.as[List[String]]).toOption + p <- l.headOption + } yield BlobStorage.Folder.coerce(p) + } }