diff --git a/config/databricks.config.reference.hocon b/config/databricks.config.reference.hocon index 7e8cb7237..de95751bb 100644 --- a/config/databricks.config.reference.hocon +++ b/config/databricks.config.reference.hocon @@ -33,6 +33,29 @@ "httpPath": "/databricks/http/path", # User agent name for Databricks connection. Optional, default value "snowplow-rdbloader-oss" "userAgent": "snowplow-rdbloader-oss" + + # Optional, default method is 'NoCreds' + # Specifies the auth method to use with 'COPY INTO' statement. + "loadAuthMethod": { + # With 'NoCreds', no credentials will be passed to 'COPY INTO' statement. + # Databricks cluster needs to have permission to access transformer + # output S3 bucket. More information can be found here: + # https://docs.databricks.com/administration-guide/cloud-configurations/aws/instance-profiles.html + "type": "NoCreds" + } + #"loadAuthMethod": { + # # With 'TempCreds', temporary credentials will be created for every + # # load operation and these temporary credentials will be passed to + # # 'COPY INTO' statement. With this way, Databricks cluster doesn't need + # # permission to access to transformer output S3 bucket. + # # This access will be provided by temporary credentials. + # "type": "TempCreds" + # # IAM role that is used while creating temporary credentials + # # Created credentials will allow to access resources specified in the given role + # # In our case, “s3:GetObject*”, “s3:ListBucket”, and “s3:GetBucketLocation” permissions + # # for transformer output S3 bucket should be specified in the role. + # "roleArn": "arn:aws:iam::123456789:role/role_name" + #} }, "schedules": { @@ -173,6 +196,9 @@ # How long loading (actual COPY statements) can take before considering Redshift unhealthy # Without any progress (i.e. different subfolder) within this period, loader # will abort the transaction + # If 'TempCreds' load auth method is used, this value will be used as session duration + # of temporary credentials. In that case, it can't be greater than maximum session duration + # of IAM role used for temporary credentials "loading": "1 hour", # How long non-loading steps (such as ALTER TABLE or metadata queries) can take diff --git a/modules/databricks-loader/src/main/resources/application.conf b/modules/databricks-loader/src/main/resources/application.conf index 33341e041..2ab2cfbf7 100644 --- a/modules/databricks-loader/src/main/resources/application.conf +++ b/modules/databricks-loader/src/main/resources/application.conf @@ -3,5 +3,9 @@ "type": "databricks" "catalog": "hive_metastore" "userAgent": "snowplow-rdbloader-oss" + "loadAuthMethod": { + "type": "NoCreds" + "roleSessionName": "rdb_loader" + } } } diff --git a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala index 613c510f9..1dff2d2b7 100644 --- a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala +++ b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala @@ -20,6 +20,7 @@ import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip, EventTableColumns} import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Entity} import com.snowplowanalytics.snowplow.rdbloader.db.{Manifest, Statement, Target} +import com.snowplowanalytics.snowplow.rdbloader.db.AuthService.LoadAuthMethod import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable import doobie.Fragment @@ -47,11 +48,11 @@ object Databricks { override def extendTable(info: ShreddedType.Info): Option[Block] = None - override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns): LoadStatements = { - val toCopy = ColumnsToCopy.fromDiscoveredData(discovery) + override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns, loadAuthMethod: LoadAuthMethod): LoadStatements = { + val toCopy = ColumnsToCopy.fromDiscoveredData(discovery) val toSkip = ColumnsToSkip(getEntityColumnsPresentInDbOnly(eventTableColumns, toCopy)) - NonEmptyList.one(Statement.EventsCopy(discovery.base, discovery.compression, toCopy, toSkip)) + NonEmptyList.one(Statement.EventsCopy(discovery.base, discovery.compression, toCopy, toSkip, loadAuthMethod)) } override def createTable(schemas: SchemaList): Block = Block(Nil, Nil, Entity.Table(tgt.schema, schemas.latest.schemaKey)) @@ -90,24 +91,27 @@ object Databricks { val frTableName = Fragment.const(qualify(AlertingTempTableName)) val frManifest = Fragment.const(qualify(Manifest.Name)) sql"SELECT run_id FROM $frTableName MINUS SELECT base FROM $frManifest" - case Statement.FoldersCopy(source) => + case Statement.FoldersCopy(source, loadAuthMethod) => val frTableName = Fragment.const(qualify(AlertingTempTableName)) val frPath = Fragment.const0(source) + val frAuth = loadAuthMethodFragment(loadAuthMethod) + sql"""COPY INTO $frTableName - FROM (SELECT _C0::VARCHAR(512) RUN_ID FROM '$frPath') - FILEFORMAT = CSV"""; - case Statement.EventsCopy(path, _, toCopy, toSkip) => + FROM (SELECT _C0::VARCHAR(512) RUN_ID FROM '$frPath' $frAuth) + FILEFORMAT = CSV""" + case Statement.EventsCopy(path, _, toCopy, toSkip, loadAuthMethod) => val frTableName = Fragment.const(qualify(EventsTable.MainName)) val frPath = Fragment.const0(path.append("output=good")) val nonNulls = toCopy.names.map(_.value) val nulls = toSkip.names.map(c => s"NULL AS ${c.value}") val currentTimestamp = "current_timestamp() AS load_tstamp" - val allColumns = (nonNulls ::: nulls) :+ currentTimestamp - - val frSelectColumns = Fragment.const0(allColumns.mkString(",")) + val allColumns = (nonNulls ::: nulls) :+ currentTimestamp + val frAuth = loadAuthMethodFragment(loadAuthMethod) + val frSelectColumns = Fragment.const0(allColumns.mkString(",")) + sql"""COPY INTO $frTableName FROM ( - SELECT $frSelectColumns from '$frPath' + SELECT $frSelectColumns from '$frPath' $frAuth ) FILEFORMAT = PARQUET COPY_OPTIONS('MERGESCHEMA' = 'TRUE')"""; @@ -175,4 +179,12 @@ object Databricks { .filter(name => name.value.startsWith(UnstructPrefix) || name.value.startsWith(ContextsPrefix)) .diff(toCopy.names) } + + private def loadAuthMethodFragment(loadAuthMethod: LoadAuthMethod): Fragment = + loadAuthMethod match { + case LoadAuthMethod.NoCreds => + Fragment.empty + case LoadAuthMethod.TempCreds(awsAccessKey, awsSecretKey, awsSessionToken) => + Fragment.const0(s"WITH ( CREDENTIAL (AWS_ACCESS_KEY = '$awsAccessKey', AWS_SECRET_KEY = '$awsSecretKey', AWS_SESSION_TOKEN = '$awsSessionToken') )") + } } diff --git a/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala index ace24f91b..db3ae1dae 100644 --- a/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala +++ b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala @@ -21,6 +21,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.SnowplowEnt import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnName, ColumnsToCopy, ColumnsToSkip} import com.snowplowanalytics.snowplow.rdbloader.db.{Statement, Target} +import com.snowplowanalytics.snowplow.rdbloader.db.AuthService.LoadAuthMethod import scala.concurrent.duration.DurationInt import org.specs2.mutable.Specification @@ -50,8 +51,8 @@ class DatabricksSpec extends Specification { val discovery = DataDiscovery(baseFolder, shreddedTypes, Compression.Gzip) - target.getLoadStatements(discovery, eventsColumns) should be like { - case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip), Nil) => + target.getLoadStatements(discovery, eventsColumns, LoadAuthMethod.NoCreds) should be like { + case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip, LoadAuthMethod.NoCreds), Nil) => path must beEqualTo(baseFolder) compression must beEqualTo(Compression.Gzip) @@ -85,12 +86,30 @@ class DatabricksSpec extends Specification { ColumnName("unstruct_event_com_acme_bbb_1"), ColumnName("contexts_com_acme_yyy_1"), )) - val statement = Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip) + val statement = Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, LoadAuthMethod.NoCreds) target.toFragment(statement).toString must beLike { case sql => sql must contain("SELECT app_id,unstruct_event_com_acme_aaa_1,contexts_com_acme_xxx_1,NULL AS unstruct_event_com_acme_bbb_1,NULL AS contexts_com_acme_yyy_1,current_timestamp() AS load_tstamp from 's3://somewhere/path/output=good/'") } } + + "create sql with credentials for loading" in { + val toCopy = ColumnsToCopy(List( + ColumnName("app_id"), + ColumnName("unstruct_event_com_acme_aaa_1"), + ColumnName("contexts_com_acme_xxx_1") + )) + val toSkip = ColumnsToSkip(List( + ColumnName("unstruct_event_com_acme_bbb_1"), + ColumnName("contexts_com_acme_yyy_1"), + )) + val loadAuthMethod = LoadAuthMethod.TempCreds("testAccessKey", "testSecretKey", "testSessionToken") + val statement = Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip, loadAuthMethod) + + target.toFragment(statement).toString must beLike { case sql => + sql must contain(s"SELECT app_id,unstruct_event_com_acme_aaa_1,contexts_com_acme_xxx_1,NULL AS unstruct_event_com_acme_bbb_1,NULL AS contexts_com_acme_yyy_1,current_timestamp() AS load_tstamp from 's3://somewhere/path/output=good/' WITH ( CREDENTIAL (AWS_ACCESS_KEY = '${loadAuthMethod.awsAccessKey}', AWS_SECRET_KEY = '${loadAuthMethod.awsSecretKey}', AWS_SESSION_TOKEN = '${loadAuthMethod.awsSessionToken}') )") + } + } } } @@ -113,7 +132,8 @@ object DatabricksSpec { "some/path", StorageTarget.PasswordConfig.PlainText("xxx"), None, - "useragent" + "useragent", + StorageTarget.LoadAuthMethod.NoCreds ), Config.Schedules(Nil), Config.Timeouts(1.minute, 1.minute, 1.minute), diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala index 803f0ea32..39ed7d1a4 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala @@ -15,11 +15,11 @@ package com.snowplowanalytics.snowplow.rdbloader import scala.concurrent.duration._ import cats.{Apply, Monad} import cats.implicits._ -import cats.effect.{Clock, Concurrent, MonadThrow, Timer} +import cats.effect.{Clock, Concurrent, MonadThrow, Timer, ContextShift} import fs2.Stream import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} import com.snowplowanalytics.snowplow.rdbloader.db.Columns._ -import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, HealthCheck, Manifest, Statement, Control => DbControl} +import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, HealthCheck, Manifest, Statement, Control => DbControl, AuthService} import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, NoOperation, Retries} import com.snowplowanalytics.snowplow.rdbloader.dsl.{AWS, Cache, DAO, FolderMonitoring, Iglu, Logging, Monitoring, StateMonitoring, Transaction} import com.snowplowanalytics.snowplow.rdbloader.loading.{EventsTable, Load, Stage, TargetCheck} @@ -47,10 +47,10 @@ object Loader { * Unlike `F` it cannot pull `A` out of DB (perform a transaction), but just * claim `A` is needed and `C[A]` later can be materialized into `F[A]` */ - def run[F[_]: Transaction[*[_], C]: Concurrent: AWS: Clock: Iglu: Cache: Logging: Timer: Monitoring, + def run[F[_]: Transaction[*[_], C]: Concurrent: AWS: Clock: Iglu: Cache: Logging: Timer: Monitoring: ContextShift, C[_]: DAO: MonadThrow: Logging](config: Config[StorageTarget], control: Control[F]): F[Unit] = { val folderMonitoring: Stream[F, Unit] = - FolderMonitoring.run[C, F](config.monitoring.folders, config.readyCheck, config.storage, control.isBusy) + FolderMonitoring.run[F, C](config.monitoring.folders, config.readyCheck, config.storage, config.timeouts, config.region.name, control.isBusy) val noOpScheduling: Stream[F, Unit] = NoOperation.run(config.schedules.noOperation, control.makePaused, control.signal.map(_.loading)) val healthCheck = @@ -88,7 +88,7 @@ object Loader { * A primary loading processing, pulling information from discovery streams * (SQS and retry queue) and performing the load operation itself */ - private def loadStream[F[_]: Transaction[*[_], C]: Concurrent: AWS: Iglu: Cache: Logging: Timer: Monitoring, + private def loadStream[F[_]: Transaction[*[_], C]: Concurrent: AWS: Iglu: Cache: Logging: Timer: Monitoring: ContextShift, C[_]: DAO: MonadThrow: Logging](config: Config[StorageTarget], control: Control[F]): Stream[F, Unit] = { val sqsDiscovery: DiscoveryStream[F] = DataDiscovery.discover[F](config, control.incrementMessages, control.isBusy) @@ -106,7 +106,7 @@ object Loader { * over to `Load`. A primary function handling the global state - everything * downstream has access only to `F` actions, instead of whole `Control` object */ - private def processDiscovery[F[_]: Transaction[*[_], C]: Concurrent: Iglu: Logging: Timer: Monitoring, + private def processDiscovery[F[_]: Transaction[*[_], C]: Concurrent: Iglu: Logging: Timer: Monitoring: ContextShift, C[_]: DAO: MonadThrow: Logging](config: Config[StorageTarget], control: Control[F]) (discovery: DataDiscovery.WithOrigin): F[Unit] = { val folder = discovery.origin.base @@ -122,7 +122,8 @@ object Loader { val loading: F[Unit] = backgroundCheck { for { start <- Clock[F].instantNow - result <- Load.load[F, C](config, setStageC, control.incrementAttempts, discovery) + loadAuth <- AuthService.getLoadAuthMethod[F](config.storage.loadAuthMethod, config.region.name, config.timeouts.loading) + result <- Load.load[F, C](config, setStageC, control.incrementAttempts, discovery, loadAuth) attempts <- control.getAndResetAttempts _ <- result match { case Right(ingested) => diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala index 62d1ebae0..bdd71b6ee 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala @@ -48,6 +48,7 @@ sealed trait StorageTarget extends Product with Serializable { def withAutoCommit: Boolean = false def connectionUrl: String def properties: Properties + def loadAuthMethod: StorageTarget.LoadAuthMethod } object StorageTarget { @@ -93,6 +94,8 @@ object StorageTarget { } props } + + def loadAuthMethod: LoadAuthMethod = LoadAuthMethod.NoCreds } final case class Databricks( @@ -103,7 +106,8 @@ object StorageTarget { httpPath: String, password: PasswordConfig, sshTunnel: Option[TunnelConfig], - userAgent: String + userAgent: String, + loadAuthMethod: LoadAuthMethod ) extends StorageTarget { override def username: String = "token" @@ -194,6 +198,8 @@ object StorageTarget { "Snowflake config requires either jdbcHost or both account and region".asLeft } } + + def loadAuthMethod: LoadAuthMethod = LoadAuthMethod.NoCreds } object Snowflake { @@ -300,6 +306,13 @@ object StorageTarget { } } + sealed trait LoadAuthMethod extends Product with Serializable + + object LoadAuthMethod { + final case object NoCreds extends LoadAuthMethod + final case class TempCreds(roleArn: String, roleSessionName: String) extends LoadAuthMethod + } + /** * SSH configuration, enabling target to be loaded though tunnel * @@ -347,6 +360,26 @@ object StorageTarget { case Snowflake.AbortStatement => "ABORT_STATEMENT" } + implicit def loadAuthMethodDecoder: Decoder[LoadAuthMethod] = + Decoder.instance { cur => + val typeCur = cur.downField("type") + typeCur.as[String].map(_.toLowerCase) match { + case Right("nocreds") => + Right(LoadAuthMethod.NoCreds) + case Right("tempcreds") => + cur.as[LoadAuthMethod.TempCreds] + case Right(other) => + Left(DecodingFailure(s"Load auth method of type $other is not supported yet. Supported types: 'NoCreds', 'TempCreds'", typeCur.history)) + case Left(DecodingFailure(_, List(CursorOp.DownField("type")))) => + Left(DecodingFailure("Cannot find 'type' string in load auth method", typeCur.history)) + case Left(other) => + Left(other) + } + } + + implicit def tempCredsAuthMethodDecoder: Decoder[LoadAuthMethod.TempCreds] = + deriveDecoder[LoadAuthMethod.TempCreds] + implicit def storageTargetDecoder: Decoder[StorageTarget] = Decoder.instance { cur => val typeCur = cur.downField("type") diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/AuthService.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/AuthService.scala new file mode 100644 index 000000000..febb60a97 --- /dev/null +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/AuthService.scala @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2014-2022 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.db + +import scala.concurrent.duration.FiniteDuration + +import cats.effect.{Concurrent, ContextShift} + +import cats.implicits._ + +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.sts.StsAsyncClient +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest + +import com.snowplowanalytics.aws.Common +import com.snowplowanalytics.snowplow.rdbloader.config.StorageTarget + +object AuthService { + /** + * Auth method that is used with COPY INTO statement + */ + sealed trait LoadAuthMethod + + object LoadAuthMethod { + /** + * Specifies auth method that doesn't use credentials + * Destination should be already configured with some other mean + * for copying from transformer output bucket + */ + final case object NoCreds extends LoadAuthMethod + + /** + * Specifies auth method that pass temporary credentials to COPY INTO statement + */ + final case class TempCreds(awsAccessKey: String, awsSecretKey: String, awsSessionToken: String) extends LoadAuthMethod + } + + /** + * Get load auth method according to value specified in the config + * If temporary credentials method is specified in the config, it will get temporary credentials + * with sending request to STS service then return credentials. + */ + def getLoadAuthMethod[F[_]: Concurrent: ContextShift](authMethodConfig: StorageTarget.LoadAuthMethod, + region: String, + sessionDuration: FiniteDuration): F[LoadAuthMethod] = + authMethodConfig match { + case StorageTarget.LoadAuthMethod.NoCreds => Concurrent[F].pure(LoadAuthMethod.NoCreds) + case StorageTarget.LoadAuthMethod.TempCreds(roleArn, roleSessionName) => + for { + stsAsyncClient <- Concurrent[F].delay( + StsAsyncClient.builder() + .region(Region.of(region)) + .build() + ) + assumeRoleRequest <- Concurrent[F].delay( + AssumeRoleRequest.builder() + .durationSeconds(sessionDuration.toSeconds.toInt) + .roleArn(roleArn) + .roleSessionName(roleSessionName) + .build() + ) + response <- Common.fromCompletableFuture( + Concurrent[F].delay(stsAsyncClient.assumeRole(assumeRoleRequest)) + ) + creds = response.credentials() + } yield LoadAuthMethod.TempCreds(creds.accessKeyId(), creds.secretAccessKey(), creds.sessionToken()) + } +} diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala index 1a17048e6..8bc60e264 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala @@ -16,6 +16,7 @@ import doobie.Fragment import com.snowplowanalytics.snowplow.rdbloader.common.{LoaderMessage, S3} import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip} +import com.snowplowanalytics.snowplow.rdbloader.db.AuthService.LoadAuthMethod import com.snowplowanalytics.snowplow.rdbloader.discovery.ShreddedType import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable @@ -51,13 +52,14 @@ object Statement { case object CreateAlertingTempTable extends Statement case object DropAlertingTempTable extends Statement case object FoldersMinusManifest extends Statement - case class FoldersCopy(source: S3.Folder) extends Statement + case class FoldersCopy(source: S3.Folder, loadAuthMethod: LoadAuthMethod) extends Statement // Loading case class EventsCopy(path: S3.Folder, compression: Compression, columnsToCopy: ColumnsToCopy, - columnsToSkip: ColumnsToSkip) extends Statement with Loading { + columnsToSkip: ColumnsToSkip, + loadAuthMethod: LoadAuthMethod) extends Statement with Loading { def table: String = EventsTable.MainName } case class ShreddedCopy(shreddedType: ShreddedType, compression: Compression) extends Statement with Loading { diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala index 7b489ad33..ad3a1cf8f 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala @@ -16,6 +16,7 @@ import com.snowplowanalytics.iglu.schemaddl.migrations.{SchemaList, Migration => import com.snowplowanalytics.snowplow.rdbloader.LoadStatements import com.snowplowanalytics.snowplow.rdbloader.db.Columns.EventTableColumns import com.snowplowanalytics.snowplow.rdbloader.db.Migration.Block +import com.snowplowanalytics.snowplow.rdbloader.db.AuthService.LoadAuthMethod import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import doobie.Fragment @@ -38,7 +39,8 @@ trait Target { * @param eventTableColumns TODO */ def getLoadStatements(discovery: DataDiscovery, - eventTableColumns: EventTableColumns): LoadStatements + eventTableColumns: EventTableColumns, + loadAuthMethod: LoadAuthMethod): LoadStatements /** Get DDL of a manifest table */ def getManifest: Statement diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala index 58b4fd32d..145b22cb1 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala @@ -21,7 +21,7 @@ import scala.concurrent.duration._ import cats.{Functor, Applicative, Monad, MonadThrow} import cats.implicits._ -import cats.effect.{Timer, Sync, Concurrent} +import cats.effect.{Timer, Sync, Concurrent, ContextShift} import cats.effect.concurrent.{ Ref, Semaphore } import doobie.util.Get @@ -32,6 +32,8 @@ import fs2.text.utf8Encode import com.snowplowanalytics.snowplow.rdbloader.common.S3 import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} import com.snowplowanalytics.snowplow.rdbloader.db.Statement._ +import com.snowplowanalytics.snowplow.rdbloader.db.AuthService +import com.snowplowanalytics.snowplow.rdbloader.db.AuthService.LoadAuthMethod import com.snowplowanalytics.snowplow.rdbloader.dsl.Monitoring.AlertPayload import com.snowplowanalytics.snowplow.rdbloader.loading.TargetCheck @@ -144,13 +146,15 @@ object FolderMonitoring { * @param loadFrom list shredded folders * @return potentially empty list of alerts */ - def check[C[_]: DAO: Monad, F[_]: MonadThrow: AWS: Transaction[*[_], C]: Timer: Logging](loadFrom: S3.Folder, - readyCheck: Config.Retries, - storageTarget: StorageTarget): F[List[AlertPayload]] = { + def check[F[_]: MonadThrow: AWS: Transaction[*[_], C]: Timer: Logging, + C[_]: DAO: Monad](loadFrom: S3.Folder, + readyCheck: Config.Retries, + storageTarget: StorageTarget, + loadAuthMethod: LoadAuthMethod): F[List[AlertPayload]] = { val getBatches = for { _ <- DAO[C].executeUpdate(DropAlertingTempTable, DAO.Purpose.NonLoading) _ <- DAO[C].executeUpdate(CreateAlertingTempTable, DAO.Purpose.NonLoading) - _ <- DAO[C].executeUpdate(FoldersCopy(loadFrom), DAO.Purpose.NonLoading) + _ <- DAO[C].executeUpdate(FoldersCopy(loadFrom, loadAuthMethod), DAO.Purpose.NonLoading) onlyS3Batches <- DAO[C].executeQueryList[S3.Folder](FoldersMinusManifest) } yield onlyS3Batches @@ -182,14 +186,16 @@ object FolderMonitoring { * If some configurations are not provided - just prints a warning. * Resulting stream has to be running in background. */ - def run[C[_]: DAO: Monad, - F[_]: Concurrent: Timer: AWS: Transaction[*[_], C]: Logging: Monitoring: MonadThrow](foldersCheck: Option[Config.Folders], - readyCheck: Config.Retries, - storageTarget: StorageTarget, - isBusy: Stream[F, Boolean]): Stream[F, Unit] = + def run[F[_]: Concurrent: Timer: AWS: Transaction[*[_], C]: Logging: Monitoring: MonadThrow: ContextShift, + C[_]: DAO: Monad](foldersCheck: Option[Config.Folders], + readyCheck: Config.Retries, + storageTarget: StorageTarget, + timeouts: Config.Timeouts, + region: String, + isBusy: Stream[F, Boolean]): Stream[F, Unit] = foldersCheck match { case Some(folders) => - stream[C, F](folders, readyCheck, storageTarget, isBusy) + stream[F, C](folders, readyCheck, storageTarget, timeouts, region, isBusy) case None => Stream.eval[F, Unit](Logging[F].info("Configuration for monitoring.folders hasn't been provided - monitoring is disabled")) } @@ -202,11 +208,13 @@ object FolderMonitoring { * @param readyCheck configuration for target ready check * @param isBusy discrete stream signalling when folders monitoring should not work */ - def stream[C[_]: DAO: Monad, - F[_]: Transaction[*[_], C]: Concurrent: Timer: AWS: Logging: Monitoring: MonadThrow](folders: Config.Folders, - readyCheck: Config.Retries, - storageTarget: StorageTarget, - isBusy: Stream[F, Boolean]): Stream[F, Unit] = + def stream[F[_]: Transaction[*[_], C]: Concurrent: Timer: AWS: Logging: Monitoring: MonadThrow: ContextShift, + C[_]: DAO: Monad](folders: Config.Folders, + readyCheck: Config.Retries, + storageTarget: StorageTarget, + timeouts: Config.Timeouts, + region: String, + isBusy: Stream[F, Boolean]): Stream[F, Unit] = Stream.eval((Semaphore[F](1), Ref.of(0)).tupled).flatMap { case (lock, failed) => getOutputKeys[F](folders) .pauseWhen(isBusy) @@ -216,16 +224,17 @@ object FolderMonitoring { val sinkAndCheck = Logging[F].info("Monitoring shredded folders") *> sinkFolders[F](folders.since, folders.until, folders.transformerOutput, outputFolder).ifM( - check[C, F](outputFolder, readyCheck, storageTarget) - .flatMap { alerts => - alerts.traverse_ { payload => + for { + loadAuth <- AuthService.getLoadAuthMethod[F](storageTarget.loadAuthMethod, region, timeouts.loading) + alerts <- check[F, C](outputFolder, readyCheck, storageTarget, loadAuth) + _ <- alerts.traverse_ { payload => val warn = payload.base match { case Some(folder) => Logging[F].warning(s"${payload.message} $folder") case None => Logging[F].error(s"${payload.message} with unknown path. Invalid state!") } warn *> Monitoring[F].alert(payload) } - }, + } yield (), Logging[F].info(s"No folders were found in ${folders.transformerOutput}. Skipping manifest check") ) *> failed.set(0) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala index b5415f980..ed415edc3 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala @@ -24,6 +24,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage import com.snowplowanalytics.snowplow.rdbloader.common.S3 import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget } import com.snowplowanalytics.snowplow.rdbloader.db.{ Control, Migration, Manifest } +import com.snowplowanalytics.snowplow.rdbloader.db.AuthService.LoadAuthMethod import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery import com.snowplowanalytics.snowplow.rdbloader.dsl.{Iglu, Transaction, Logging, Monitoring, DAO} import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics.Metrics @@ -75,12 +76,13 @@ object Load { (config: Config[StorageTarget], setStage: Stage => C[Unit], incrementAttempt: F[Unit], - discovery: DataDiscovery.WithOrigin): F[Either[AlertPayload, Option[Instant]]] = + discovery: DataDiscovery.WithOrigin, + loadAuthMethod: LoadAuthMethod): F[Either[AlertPayload, Option[Instant]]] = for { _ <- TargetCheck.blockUntilReady[F, C](config.readyCheck, config.storage) migrations <- Migration.build[F, C](discovery.discovery) _ <- Transaction[F, C].run(setStage(Stage.MigrationPre) *> migrations.preTransaction) - transaction = getTransaction[C](setStage, discovery)(migrations.inTransaction) + transaction = getTransaction[C](setStage, discovery, loadAuthMethod)(migrations.inTransaction) result <- Retry.retryLoad(config.retries, incrementAttempt, Transaction[F, C].transact(transaction)) } yield result @@ -95,7 +97,8 @@ object Load { * @return either alert payload in case of an existing folder or ingestion timestamp of the current folder */ def getTransaction[F[_]: Logging: Monad: DAO](setStage: Stage => F[Unit], - discovery: DataDiscovery.WithOrigin) + discovery: DataDiscovery.WithOrigin, + loadAuthMethod: LoadAuthMethod) (inTransactionMigrations: F[Unit]): F[Either[AlertPayload, Option[Instant]]] = for { _ <- setStage(Stage.ManifestCheck) @@ -112,7 +115,7 @@ object Load { Logging[F].info(s"Loading transaction for ${discovery.origin.base} has started") *> setStage(Stage.MigrationIn) *> inTransactionMigrations *> - run[F](setLoading, discovery.discovery) *> + run[F](setLoading, discovery.discovery, loadAuthMethod) *> setStage(Stage.Committing) *> Manifest.add[F](discovery.origin.toManifestItem) *> Manifest @@ -128,11 +131,12 @@ object Load { * @return block of VACUUM and ANALYZE statements to execute them out of a main transaction */ def run[F[_]: Monad: Logging: DAO](setLoading: String => F[Unit], - discovery: DataDiscovery): F[Unit] = + discovery: DataDiscovery, + loadAuthMethod: LoadAuthMethod): F[Unit] = for { _ <- Logging[F].info(s"Loading ${discovery.base}") existingEventTableColumns <- if (DAO[F].target.requiresEventsColumns) Control.getColumns[F](EventsTable.MainName) else Nil.pure[F] - _ <- DAO[F].target.getLoadStatements(discovery, existingEventTableColumns).traverse_ { statement => + _ <- DAO[F].target.getLoadStatements(discovery, existingEventTableColumns, loadAuthMethod).traverse_ { statement => Logging[F].info(statement.title) *> setLoading(statement.table) *> DAO[F].executeUpdate(statement, DAO.Purpose.Loading).void diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala index 337715329..b48502f68 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala @@ -23,6 +23,7 @@ import io.circe.syntax._ import com.snowplowanalytics.snowplow.rdbloader.common.S3 import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} import com.snowplowanalytics.snowplow.rdbloader.db.Statement +import com.snowplowanalytics.snowplow.rdbloader.db.AuthService.LoadAuthMethod import com.snowplowanalytics.snowplow.rdbloader.dsl.Monitoring.AlertPayload.Severity import com.snowplowanalytics.snowplow.rdbloader.test.{Pure, PureTransaction, PureDAO, TestState, PureAWS, PureTimer, PureOps, PureLogging} @@ -43,7 +44,7 @@ class FolderMonitoringSpec extends Specification { val expectedState = TestState(List( PureTransaction.CommitMessage, TestState.LogEntry.Sql(Statement.FoldersMinusManifest), - TestState.LogEntry.Sql(Statement.FoldersCopy(S3.Folder.coerce("s3://bucket/shredded/"))), + TestState.LogEntry.Sql(Statement.FoldersCopy(S3.Folder.coerce("s3://bucket/shredded/"), LoadAuthMethod.NoCreds)), TestState.LogEntry.Sql(Statement.CreateAlertingTempTable), TestState.LogEntry.Sql(Statement.DropAlertingTempTable), PureTransaction.StartMessage, @@ -54,7 +55,7 @@ class FolderMonitoringSpec extends Specification { Monitoring.AlertPayload(Monitoring.Application, Some(S3.Folder.coerce("s3://bucket/shredded/run=2021-07-09-12-30-00/")), Severity.Warning, "Incomplete shredding", Map.empty) ) - val (state, result) = FolderMonitoring.check[Pure, Pure](loadFrom, exampleReadyCheckConfig, exampleDatabricks).run + val (state, result) = FolderMonitoring.check[Pure, Pure](loadFrom, exampleReadyCheckConfig, exampleDatabricks, LoadAuthMethod.NoCreds).run state must beEqualTo(expectedState) result must beRight.like { @@ -74,7 +75,7 @@ class FolderMonitoringSpec extends Specification { val expectedState = TestState(List( PureTransaction.CommitMessage, TestState.LogEntry.Sql(Statement.FoldersMinusManifest), - TestState.LogEntry.Sql(Statement.FoldersCopy(S3.Folder.coerce("s3://bucket/shredded/"))), + TestState.LogEntry.Sql(Statement.FoldersCopy(S3.Folder.coerce("s3://bucket/shredded/"), LoadAuthMethod.NoCreds)), TestState.LogEntry.Sql(Statement.CreateAlertingTempTable), TestState.LogEntry.Sql(Statement.DropAlertingTempTable), PureTransaction.StartMessage, @@ -85,7 +86,7 @@ class FolderMonitoringSpec extends Specification { Monitoring.AlertPayload(Monitoring.Application, Some(S3.Folder.coerce("s3://bucket/shredded/run=2021-07-09-12-30-00/")), Severity.Warning, "Unloaded batch", Map.empty) ) - val (state, result) = FolderMonitoring.check[Pure, Pure](loadFrom, exampleReadyCheckConfig, exampleDatabricks).run + val (state, result) = FolderMonitoring.check[Pure, Pure](loadFrom, exampleReadyCheckConfig, exampleDatabricks, LoadAuthMethod.NoCreds).run state must beEqualTo(expectedState) result must beRight.like { @@ -215,6 +216,7 @@ object FolderMonitoringSpec { "http/path", StorageTarget.PasswordConfig.PlainText("Supersecret1"), None, - "snowplow-rdbloader-oss" + "snowplow-rdbloader-oss", + StorageTarget.LoadAuthMethod.NoCreds ) } diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala index 2e0152b92..8608dc37b 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala @@ -25,6 +25,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.config.Semver import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Iglu, Logging, Transaction} import com.snowplowanalytics.snowplow.rdbloader.db.{Manifest, Statement} +import com.snowplowanalytics.snowplow.rdbloader.db.AuthService.LoadAuthMethod import org.specs2.mutable.Specification import com.snowplowanalytics.snowplow.rdbloader.SpecHelpers._ import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip} @@ -52,14 +53,14 @@ class LoadSpec extends Specification { PureTransaction.StartMessage, LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)), - LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, ColumnsToCopy(List.empty), ColumnsToSkip(List.empty))), + LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, ColumnsToCopy(List.empty), ColumnsToSkip(List.empty), LoadAuthMethod.NoCreds)), LogEntry.Sql(Statement.ShreddedCopy(info,Compression.Gzip)), LogEntry.Sql(Statement.ManifestAdd(LoadSpec.dataDiscoveryWithOrigin.origin.toManifestItem)), LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)), PureTransaction.CommitMessage, ) - val result = Load.load[Pure, Pure](SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin).runS + val result = Load.load[Pure, Pure](SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin, LoadAuthMethod.NoCreds).runS result.getLog must beEqualTo(expected) } @@ -83,7 +84,7 @@ class LoadSpec extends Specification { PureTransaction.CommitMessage, ) - val result = Load.load[Pure, Pure](SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin).runS + val result = Load.load[Pure, Pure](SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin, LoadAuthMethod.NoCreds).runS result.getLog must beEqualTo(expected) } @@ -105,19 +106,19 @@ class LoadSpec extends Specification { PureTransaction.StartMessage, LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)), - LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, ColumnsToCopy(List.empty), ColumnsToSkip(List.empty))), + LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, ColumnsToCopy(List.empty), ColumnsToSkip(List.empty), LoadAuthMethod.NoCreds)), LogEntry.Sql(Statement.ShreddedCopy(info,Compression.Gzip)), PureTransaction.RollbackMessage, LogEntry.Message("SLEEP 30000000000 nanoseconds"), PureTransaction.StartMessage, LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)), - LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, ColumnsToCopy(List.empty), ColumnsToSkip(List.empty))), + LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, ColumnsToCopy(List.empty), ColumnsToSkip(List.empty), LoadAuthMethod.NoCreds)), LogEntry.Sql(Statement.ShreddedCopy(info,Compression.Gzip)), LogEntry.Sql(Statement.ManifestAdd(LoadSpec.dataDiscoveryWithOrigin.origin.toManifestItem)), LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)), PureTransaction.CommitMessage, ) - val result = Load.load[Pure, Pure](SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin).runS + val result = Load.load[Pure, Pure](SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin, LoadAuthMethod.NoCreds).runS result.getLog must beEqualTo(expected) } @@ -150,7 +151,7 @@ class LoadSpec extends Specification { PureTransaction.CommitMessage, // TODO: this is potentially dangerous, we need // to throw an ad-hoc exception within a transaction ) - val result = Load.load[Pure, Pure](SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin).runS + val result = Load.load[Pure, Pure](SpecHelpers.validCliConfig.config, LoadSpec.setStageNoOp, Pure.unit, LoadSpec.dataDiscoveryWithOrigin, LoadAuthMethod.NoCreds).runS result.getLog must beEqualTo(expected) } diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala index 0972a6480..69b36276c 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala @@ -24,6 +24,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig. import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip, EventTableColumns} import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Item} import com.snowplowanalytics.snowplow.rdbloader.db.{Migration, Statement, Target} +import com.snowplowanalytics.snowplow.rdbloader.db.AuthService.LoadAuthMethod import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import com.snowplowanalytics.snowplow.rdbloader.dsl.DAO @@ -90,9 +91,9 @@ object PureDAO { def toFragment(statement: Statement): Fragment = Fragment.const0(statement.toString) - def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns): LoadStatements = + def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns, loadAuthMethod: LoadAuthMethod): LoadStatements = NonEmptyList( - Statement.EventsCopy(discovery.base, Compression.Gzip, ColumnsToCopy(List.empty), ColumnsToSkip(List.empty)), + Statement.EventsCopy(discovery.base, Compression.Gzip, ColumnsToCopy(List.empty), ColumnsToSkip(List.empty), loadAuthMethod), discovery.shreddedTypes.map { shredded => Statement.ShreddedCopy(shredded, Compression.Gzip) } diff --git a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala index ec87137fd..c89d7b17f 100644 --- a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala +++ b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala @@ -25,6 +25,7 @@ import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip, EventTableColumns} import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Entity, Item, NoPreStatements, NoStatements} import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, Manifest, Statement, Target} +import com.snowplowanalytics.snowplow.rdbloader.db.AuthService.LoadAuthMethod import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable import doobie.Fragment @@ -69,13 +70,13 @@ object Redshift { override def extendTable(info: ShreddedType.Info): Option[Block] = throw new IllegalStateException("Redshift Loader does not support loading wide row") - override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns): LoadStatements = { + override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns, loadAuthMethod: LoadAuthMethod): LoadStatements = { val shreddedStatements = discovery .shreddedTypes .filterNot(_.isAtomic) .map(shreddedType => Statement.ShreddedCopy(shreddedType, discovery.compression)) - val atomic = Statement.EventsCopy(discovery.base, discovery.compression, ColumnsToCopy(AtomicColumns.Columns), ColumnsToSkip.none) + val atomic = Statement.EventsCopy(discovery.base, discovery.compression, ColumnsToCopy(AtomicColumns.Columns), ColumnsToSkip.none, loadAuthMethod) NonEmptyList(atomic, shreddedStatements) } @@ -104,12 +105,12 @@ object Redshift { val frTableName = Fragment.const(AlertingTempTableName) val frManifest = Fragment.const(s"${schema}.manifest") sql"SELECT run_id FROM $frTableName MINUS SELECT base FROM $frManifest" - case Statement.FoldersCopy(source) => + case Statement.FoldersCopy(source, _) => val frTableName = Fragment.const(AlertingTempTableName) val frRoleArn = Fragment.const0(s"aws_iam_role=$roleArn") val frPath = Fragment.const0(source) sql"COPY $frTableName FROM '$frPath' CREDENTIALS '$frRoleArn' DELIMITER '$EventFieldSeparator'" - case Statement.EventsCopy(path, compression, columnsToCopy, _) => + case Statement.EventsCopy(path, compression, columnsToCopy, _, _) => // For some reasons Redshift JDBC doesn't handle interpolation in COPY statements val frTableName = Fragment.const(EventsTable.withSchema(schema)) val frPath = Fragment.const0(Common.entityPathFull(path, Common.AtomicType)) diff --git a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala index 68f075789..cb17742f1 100644 --- a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala +++ b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala @@ -24,6 +24,7 @@ import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip, EventTableColumns} import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Entity, Item} import com.snowplowanalytics.snowplow.rdbloader.db.{Manifest, Statement, Target} +import com.snowplowanalytics.snowplow.rdbloader.db.AuthService.LoadAuthMethod import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable import doobie.Fragment @@ -62,13 +63,14 @@ object Snowflake { Some(Block(List(addColumn), Nil, Entity.Column(info))) } - override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns): LoadStatements = + override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns, loadAuthMethod: LoadAuthMethod): LoadStatements = NonEmptyList.one( Statement.EventsCopy( discovery.base, discovery.compression, ColumnsToCopy.fromDiscoveredData(discovery), - ColumnsToSkip.none + ColumnsToSkip.none, + loadAuthMethod ) ) @@ -96,14 +98,14 @@ object Snowflake { val frTableName = Fragment.const(qualify(AlertingTempTableName)) val frManifest = Fragment.const(qualify(Manifest.Name)) sql"SELECT run_id FROM $frTableName MINUS SELECT base FROM $frManifest" - case Statement.FoldersCopy(source) => + case Statement.FoldersCopy(source, _) => val frTableName = Fragment.const(qualify(AlertingTempTableName)) // This is validated on config decoding stage val stageName = monitoringStage.getOrElse(throw new IllegalStateException("Folder Monitoring is launched without monitoring stage being provided")) val frPath = Fragment.const0(s"@$schema.$stageName/${source.folderName}") sql"COPY INTO $frTableName FROM $frPath FILE_FORMAT = (TYPE = CSV)" - case Statement.EventsCopy(path, _, columnsToCopy, _) => { + case Statement.EventsCopy(path, _, columnsToCopy, _, _) => { def columnsForCopy: String = columnsToCopy.names.map(_.value).mkString(",") + ",load_tstamp" def columnsForSelect: String = columnsToCopy.names.map(c => s"$$1:${c.value}").mkString(",") + ",current_timestamp()" diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 2f0fc2530..edba69617 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -144,6 +144,7 @@ object Dependencies { val aws2sns = "software.amazon.awssdk" % "sns" % V.aws2 val aws2kinesis = "software.amazon.awssdk" % "kinesis" % V.aws2 val aws2regions = "software.amazon.awssdk" % "regions" % V.aws2 + val aws2sts = "software.amazon.awssdk" % "sts" % V.aws2 val protobuf = "com.google.protobuf" % "protobuf-java" % V.protobuf val commons = "commons-io" % "commons-io" % V.commons val kafkaClients = "org.apache.kafka" % "kafka-clients" % V.kafkaClients @@ -189,6 +190,7 @@ object Dependencies { val loaderDependencies = Seq( slf4j, ssm, + aws2sts, dynamodb, jSch, sentry,