Skip to content

Commit

Permalink
Loader: fix inserting timestamps with wrong timezone to manifest table (
Browse files Browse the repository at this point in the history
close #1069)
  • Loading branch information
spenes committed Sep 22, 2022
1 parent c030c70 commit 71e8071
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, Shredd
import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable
import doobie.Fragment
import doobie.implicits._
import doobie.implicits.javasql._
import io.circe.syntax._

import java.sql.Timestamp

object Databricks {

val AlertingTempTableName = "rdb_folder_monitoring"
Expand Down Expand Up @@ -133,19 +130,16 @@ object Databricks {
case Statement.ManifestAdd(message) =>
val tableName = Fragment.const(qualify(Manifest.Name))
val types = message.types.asJson.noSpaces
val jobStarted: String = message.timestamps.jobStarted.toString
val jobCompleted: String = message.timestamps.jobCompleted.toString
val minTstamp: String = message.timestamps.min.map(_.toString).getOrElse("")
val maxTstamp: String = message.timestamps.max.map(_.toString).getOrElse("")
sql"""INSERT INTO $tableName
(base, types, shredding_started, shredding_completed,
min_collector_tstamp, max_collector_tstamp, ingestion_tstamp,
compression, processor_artifact, processor_version, count_good)
VALUES (${message.base}, $types,
${Timestamp.from(message.timestamps.jobStarted)}, ${Timestamp.from(
message.timestamps.jobCompleted
)},
${message.timestamps.min.map(Timestamp.from)}, ${message.timestamps.max.map(Timestamp.from)},
current_timestamp(),
${message.compression.asString}, ${message.processor.artifact}, ${message
.processor
.version}, ${message.count})"""
VALUES (${message.base}, $types, $jobStarted, $jobCompleted, $minTstamp, $maxTstamp, current_timestamp(),
${message.compression.asString}, ${message.processor.artifact}, ${message.processor.version}, ${message.count})"""
case Statement.ManifestGet(base) =>
sql"""SELECT ingestion_tstamp,
base, types, shredding_started, shredding_completed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ object StorageTarget {
props.put("warehouse", warehouse)
props.put("db", database)
props.put("application", appName)
props.put("timezone", "UTC")
role.foreach(r => props.put("role", r))
props
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,8 @@ import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, Shredd
import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable
import doobie.Fragment
import doobie.implicits._
import doobie.implicits.javasql._
import io.circe.syntax._

import java.sql.Timestamp


object Snowflake {

val EventFieldSeparator = Fragment.const0("\t")
Expand Down Expand Up @@ -209,15 +205,16 @@ object Snowflake {
case Statement.ManifestAdd(message) =>
val tableName = Fragment.const(qualify(Manifest.Name))
val types = Fragment.const0(s"parse_json('${message.types.asJson.noSpaces}')")
val jobStarted: String = message.timestamps.jobStarted.toString
val jobCompleted: String = message.timestamps.jobCompleted.toString
val minTstamp: String = message.timestamps.min.map(_.toString).getOrElse("")
val maxTstamp:String = message.timestamps.max.map(_.toString).getOrElse("")
// Redshift JDBC doesn't accept java.time.Instant
sql"""INSERT INTO $tableName
(base, types, shredding_started, shredding_completed,
min_collector_tstamp, max_collector_tstamp, ingestion_tstamp,
compression, processor_artifact, processor_version, count_good)
SELECT ${message.base}, $types,
${Timestamp.from(message.timestamps.jobStarted)}, ${Timestamp.from(message.timestamps.jobCompleted)},
${message.timestamps.min.map(Timestamp.from)}, ${message.timestamps.max.map(Timestamp.from)},
getdate(),
SELECT ${message.base}, $types, $jobStarted, $jobCompleted, $minTstamp, $maxTstamp, sysdate(),
${message.compression.asString}, ${message.processor.artifact}, ${message.processor.version}, ${message.count}"""
case Statement.ManifestGet(base) =>
sql"""SELECT ingestion_tstamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object SnowflakeDatatype {
}

final case object Timestamp extends SnowflakeDatatype {
def toDdl: Fragment = fr0"TIMESTAMP"
def toDdl: Fragment = fr0"TIMESTAMP_NTZ"
}

final case class Char(size: Int) extends SnowflakeDatatype {
Expand Down

0 comments on commit 71e8071

Please sign in to comment.