Skip to content

Commit

Permalink
Databricks loader: Support for generated columns (close #951)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jun 25, 2022
1 parent b1f5039 commit 2e49096
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ object Databricks {

val AlertingTempTableName = "rdb_folder_monitoring"
val ManifestName = "manifest"
val UnstructPrefix = "unstruct_event_"
val ContextsPrefix = "contexts_"

def build(config: Config[StorageTarget]): Either[String, Target] = {
config.storage match {
Expand All @@ -42,8 +44,13 @@ object Databricks {

def extendTable(info: ShreddedType.Info): Option[Block] = None

def getLoadStatements(discovery: DataDiscovery): LoadStatements =
NonEmptyList.one(Statement.EventsCopy(discovery.base, discovery.compression, getColumns(discovery)))
def getLoadStatements(discovery: DataDiscovery, eventsColumns: List[String]): LoadStatements = {
val toCopy = getColumns(discovery)
val toSkip = eventsColumns
.filter(c => c.startsWith(UnstructPrefix) || c.startsWith(ContextsPrefix))
.diff(toCopy)
NonEmptyList.one(Statement.EventsCopy(discovery.base, discovery.compression, toCopy, toSkip))
}

def getColumns(discovery: DataDiscovery): List[String] = {
val atomicColumns = AtomicColumns.Columns
Expand Down Expand Up @@ -79,6 +86,8 @@ object Databricks {
|""".stripMargin)
)

def requiresEventsColumns: Boolean = true

def toFragment(statement: Statement): Fragment =
statement match {
case Statement.Select1 => sql"SELECT 1"
Expand All @@ -101,10 +110,13 @@ object Databricks {
sql"""COPY INTO $frTableName
FROM (SELECT _C0::VARCHAR(512) RUN_ID FROM '$frPath')
FILEFORMAT = CSV""";
case Statement.EventsCopy(path, _, columns) =>
case Statement.EventsCopy(path, _, toCopy, toSkip) =>
val frTableName = Fragment.const(qualify(EventsTable.MainName))
val frPath = Fragment.const0(s"$path/output=good")
val frSelectColumns = Fragment.const0(columns.mkString(",") + ", current_timestamp() as load_tstamp")
val frPath = Fragment.const0(path.append("output=good"))
val allColumns = toCopy ::: toSkip.map (c => s"NULL AS $c") ::: List(
"current_timestamp() AS load_tstamp"
)
val frSelectColumns = Fragment.const0(allColumns.mkString(","))
sql"""COPY INTO $frTableName
FROM (
SELECT $frSelectColumns from '$frPath'
Expand All @@ -124,9 +136,10 @@ object Databricks {
case _: Statement.RenameTable =>
throw new IllegalStateException("Databricks Loader does not support migrations")
case Statement.SetSearchPath =>
throw new IllegalStateException("Databricks Loader does not support migrations")
case _: Statement.GetColumns =>
throw new IllegalStateException("Databricks Loader does not support migrations")
Fragment.const0(s"USE SCHEMA ${tgt.catalog}.${tgt.schema}")
case Statement.GetColumns(tableName) =>
val qualifiedName = Fragment.const(qualify(tableName))
sql"SHOW columns in $qualifiedName"
case Statement.ManifestAdd(message) =>
val tableName = Fragment.const(qualify(ManifestName))
val types = message.types.asJson.noSpaces
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (c) 2012-2022 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.loader.databricks

import cats.data.NonEmptyList

import com.snowplowanalytics.snowplow.rdbloader.common.S3
import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType}
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
import com.snowplowanalytics.snowplow.rdbloader.common.config.Region
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.SnowplowEntity
import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget}
import com.snowplowanalytics.snowplow.rdbloader.db.{Statement, Target}

import scala.concurrent.duration.DurationInt

import org.specs2.mutable.Specification


class DatabricksSpec extends Specification {
import DatabricksSpec._

"getLoadStatements" should {

"create LoadStatements with columns to copy and columns to skip" in {

val eventsColumns = List(
"unstruct_event_com_acme_aaa_1",
"unstruct_event_com_acme_bbb_1",
"contexts_com_acme_xxx_1",
"contexts_com_acme_yyy_1",
"not_a_snowplow_column"
)

val shreddedTypes = List(
ShreddedType.Widerow(ShreddedType.Info(baseFolder, "com_acme", "aaa", 1, SnowplowEntity.SelfDescribingEvent)),
ShreddedType.Widerow(ShreddedType.Info(baseFolder, "com_acme", "ccc", 1, SnowplowEntity.SelfDescribingEvent)),
ShreddedType.Widerow(ShreddedType.Info(baseFolder, "com_acme", "yyy", 1, SnowplowEntity.Context)),
ShreddedType.Widerow(ShreddedType.Info(baseFolder, "com_acme", "zzz", 1, SnowplowEntity.Context))
)

val discovery = DataDiscovery(baseFolder, shreddedTypes, Compression.Gzip)

target.getLoadStatements(discovery, eventsColumns) should be like {
case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip), Nil) =>
path must beEqualTo(baseFolder)
compression must beEqualTo(Compression.Gzip)

columnsToCopy must contain(allOf(
"unstruct_event_com_acme_aaa_1",
"unstruct_event_com_acme_ccc_1",
"contexts_com_acme_yyy_1",
"contexts_com_acme_zzz_1",
))

columnsToCopy must not contain("unstruct_event_com_acme_bbb_1")
columnsToCopy must not contain("contexts_com_acme_xxx_1")
columnsToCopy must not contain("not_a_snowplow_column")

columnsToSkip must beEqualTo(List(
"unstruct_event_com_acme_bbb_1",
"contexts_com_acme_xxx_1",
))
}
}
}

"toFragment" should {
"create sql for loading" in {
val toCopy = List(
"app_id",
"unstruct_event_com_acme_aaa_1",
"contexts_com_acme_xxx_1"
)
val toSkip = List(
"unstruct_event_com_acme_bbb_1",
"contexts_com_acme_yyy_1"
)
val statement = Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip)

target.toFragment(statement).toString must beLike { case sql =>
sql must contain("SELECT app_id,unstruct_event_com_acme_aaa_1,contexts_com_acme_xxx_1,NULL AS unstruct_event_com_acme_bbb_1,NULL AS contexts_com_acme_yyy_1,current_timestamp() AS load_tstamp from 's3://somewhere/path/output=good/'")
}
}
}
}

object DatabricksSpec {

val baseFolder: S3.Folder =
S3.Folder.coerce("s3://somewhere/path")

val target: Target = Databricks.build(Config(
Region("eu-central-1"),
None,
Config.Monitoring(None, None, Config.Metrics(None, None, 1.minute), None, None, None),
"my-queue.fifo",
None,
StorageTarget.Databricks(
"host",
"hive_metastore",
"snowplow",
443,
"some/path",
StorageTarget.PasswordConfig.PlainText("xxx"),
None,
"useragent"
),
Config.Schedules(Nil),
Config.Timeouts(1.minute, 1.minute, 1.minute),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None)
)).right.get

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import doobie.Fragment
import com.snowplowanalytics.snowplow.rdbloader.common.{S3, LoaderMessage}
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
import com.snowplowanalytics.snowplow.rdbloader.discovery.ShreddedType
import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable


/**
Expand Down Expand Up @@ -53,8 +54,8 @@ object Statement {
case class FoldersCopy(source: S3.Folder) extends Statement

// Loading
case class EventsCopy(path: S3.Folder, compression: Compression, columns: List[String]) extends Statement with Loading {
def table: String = "events"
case class EventsCopy(path: S3.Folder, compression: Compression, columnsToCopy: List[String], columnsToSkip: List[String]) extends Statement with Loading {
def table: String = EventsTable.MainName
}
case class ShreddedCopy(shreddedType: ShreddedType, compression: Compression) extends Statement with Loading {
def table: String = shreddedType.info.getName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ trait Target {
* Transform `DataDiscovery` into `LoadStatements`
* The statements could be either single statement (only `events` table)
* or multi-statement (`events` plus shredded types)
* @param discovery TODO
* @param eventsColumns TODO
*/
def getLoadStatements(discovery: DataDiscovery): LoadStatements
def getLoadStatements(discovery: DataDiscovery, eventsColumns: List[String]): LoadStatements

/** Get DDL of a manifest table */
def getManifest: Statement
Expand All @@ -49,4 +51,7 @@ trait Target {

/** Add a new column into `events`, i.e. extend a wide row. Unlike `updateTable` it always operates on `events` table */
def extendTable(info: ShreddedType.Info): Option[Block]

/** Whether the target needs to know existing columns in the events table */
def requiresEventsColumns: Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import cats.effect.{Timer, Clock}
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.S3
import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget }
import com.snowplowanalytics.snowplow.rdbloader.db.{ Migration, Manifest }
import com.snowplowanalytics.snowplow.rdbloader.db.{ Control, Migration, Manifest }
import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery
import com.snowplowanalytics.snowplow.rdbloader.dsl.{Iglu, Transaction, Logging, Monitoring, DAO}
import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics.Metrics
Expand Down Expand Up @@ -131,7 +131,8 @@ object Load {
discovery: DataDiscovery): F[Unit] =
for {
_ <- Logging[F].info(s"Loading ${discovery.base}")
_ <- DAO[F].target.getLoadStatements(discovery).traverse_ { statement =>
cols <- if (DAO[F].target.requiresEventsColumns) Control.getColumns[F](EventsTable.MainName) else Nil.pure[F]
_ <- DAO[F].target.getLoadStatements(discovery, cols).traverse_ { statement =>
Logging[F].info(statement.title) *>
setLoading(statement.table) *>
DAO[F].executeUpdate(statement, DAO.Purpose.Loading).void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class LoadSpec extends Specification {

PureTransaction.StartMessage,
LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)),
LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, List.empty)),
LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, List.empty, List.empty)),
LogEntry.Sql(Statement.ShreddedCopy(info,Compression.Gzip)),
LogEntry.Sql(Statement.ManifestAdd(LoadSpec.dataDiscoveryWithOrigin.origin.toManifestItem)),
LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)),
Expand Down Expand Up @@ -111,13 +111,13 @@ class LoadSpec extends Specification {

PureTransaction.StartMessage,
LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)),
LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, List.empty)),
LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, List.empty, List.empty)),
LogEntry.Sql(Statement.ShreddedCopy(info,Compression.Gzip)),
PureTransaction.RollbackMessage,
LogEntry.Message("SLEEP 30000000000 nanoseconds"),
PureTransaction.StartMessage,
LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)),
LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, List.empty)),
LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, List.empty, List.empty)),
LogEntry.Sql(Statement.ShreddedCopy(info,Compression.Gzip)),
LogEntry.Sql(Statement.ManifestAdd(LoadSpec.dataDiscoveryWithOrigin.origin.toManifestItem)),
LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ object PureDAO {
def toFragment(statement: Statement): Fragment =
Fragment.const0(statement.toString)

def getLoadStatements(discovery: DataDiscovery): LoadStatements =
def getLoadStatements(discovery: DataDiscovery, eventsColumns: List[String]): LoadStatements =
NonEmptyList(
Statement.EventsCopy(discovery.base, Compression.Gzip, List.empty),
Statement.EventsCopy(discovery.base, Compression.Gzip, List.empty, List.empty),
discovery.shreddedTypes.map { shredded =>
Statement.ShreddedCopy(shredded, Compression.Gzip)
}
Expand All @@ -117,5 +117,7 @@ object PureDAO {
val entity = Migration.Entity.Table("public", schemas.latest.schemaKey)
Block(Nil, List(Item.CreateTable(Fragment.const0(createTable.toDdl))), entity)
}

def requiresEventsColumns: Boolean = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ object Redshift {
def extendTable(info: ShreddedType.Info): Option[Block] =
throw new IllegalStateException("Redshift Loader does not support loading wide row")

def getLoadStatements(discovery: DataDiscovery): LoadStatements = {
def getLoadStatements(discovery: DataDiscovery, eventsColumns: List[String]): LoadStatements = {
val shreddedStatements = discovery
.shreddedTypes
.filterNot(_.isAtomic)
.map(shreddedType => Statement.ShreddedCopy(shreddedType, discovery.compression))
// Since EventsCopy is used only for atomic events in Redshift Loader,
// 'columns' field of EventsCopy isn't needed therefore it is set to empty list.
val atomic = Statement.EventsCopy(discovery.base, discovery.compression, List.empty)
val atomic = Statement.EventsCopy(discovery.base, discovery.compression, List.empty, List.empty)
NonEmptyList(atomic, shreddedStatements)
}

Expand All @@ -91,6 +91,8 @@ object Redshift {
def getManifest: Statement =
Statement.CreateTable(Fragment.const0(getManifestDef(schema).render))

def requiresEventsColumns: Boolean = false

def toFragment(statement: Statement): Fragment =
statement match {
case Statement.Select1 => sql"SELECT 1"
Expand All @@ -111,7 +113,7 @@ object Redshift {
val frRoleArn = Fragment.const0(s"aws_iam_role=$roleArn")
val frPath = Fragment.const0(source)
sql"COPY $frTableName FROM '$frPath' CREDENTIALS '$frRoleArn' DELIMITER '$EventFieldSeparator'"
case Statement.EventsCopy(path, compression, _) =>
case Statement.EventsCopy(path, compression, _, _) =>
// For some reasons Redshift JDBC doesn't handle interpolation in COPY statements
val frTableName = Fragment.const(EventsTable.withSchema(schema))
val frPath = Fragment.const0(Common.entityPathFull(path, Common.AtomicType))
Expand Down Expand Up @@ -194,8 +196,7 @@ object Redshift {
case Statement.SetSearchPath =>
Fragment.const0(s"SET search_path TO ${schema}")
case Statement.GetColumns(tableName) =>
val fullName = qualify(tableName)
sql"""SELECT "column" FROM PG_TABLE_DEF WHERE tablename = $fullName AND schemaname = $schema"""
sql"""SELECT "column" FROM PG_TABLE_DEF WHERE tablename = $tableName AND schemaname = $schema"""
case Statement.ManifestAdd(message) =>
val tableName = Fragment.const(qualify(ManifestName))
val types = message.types.asJson.noSpaces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ object Snowflake {
Some(Block(List(addColumn), Nil, Entity.Column(info)))
}

def getLoadStatements(discovery: DataDiscovery): LoadStatements =
NonEmptyList(Statement.EventsCopy(discovery.base, discovery.compression, getColumns(discovery)), Nil)
def getLoadStatements(discovery: DataDiscovery, eventsColumns: List[String]): LoadStatements =
NonEmptyList.one(Statement.EventsCopy(discovery.base, discovery.compression, getColumns(discovery), Nil))

// Technically, Snowflake Loader cannot create new tables
def createTable(schemas: SchemaList): Block = {
Expand All @@ -77,6 +77,8 @@ object Snowflake {
def getManifest: Statement =
Statement.CreateTable(SnowflakeManifest.getManifestDef(schema).toFragment)

def requiresEventsColumns: Boolean = false

def getColumns(discovery: DataDiscovery): List[String] = {
val atomicColumns = AtomicColumns.Columns
val shredTypeColumns = discovery.shreddedTypes
Expand Down Expand Up @@ -113,7 +115,7 @@ object Snowflake {
val frPath = Fragment.const0(s"@$schema.$stageName/${source.folderName}")
sql"COPY INTO $frTableName FROM $frPath FILE_FORMAT = (TYPE = CSV)"

case Statement.EventsCopy(path, _, columns) => {
case Statement.EventsCopy(path, _, columns, _) => {
def columnsForCopy: String = columns.mkString(",") + ",load_tstamp"
def columnsForSelect: String = columns.map(c => s"$$1:$c").mkString(",") + ",current_timestamp()"

Expand Down

0 comments on commit 2e49096

Please sign in to comment.