diff --git a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala index 503d0b997..9a262da4b 100644 --- a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala +++ b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala @@ -25,11 +25,8 @@ import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, Shredd import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable import doobie.Fragment import doobie.implicits._ -import doobie.implicits.javasql._ import io.circe.syntax._ -import java.sql.Timestamp - object Databricks { val AlertingTempTableName = "rdb_folder_monitoring" @@ -133,19 +130,16 @@ object Databricks { case Statement.ManifestAdd(message) => val tableName = Fragment.const(qualify(Manifest.Name)) val types = message.types.asJson.noSpaces + val jobStarted: String = message.timestamps.jobStarted.toString + val jobCompleted: String = message.timestamps.jobCompleted.toString + val minTstamp: String = message.timestamps.min.map(_.toString).getOrElse("") + val maxTstamp: String = message.timestamps.max.map(_.toString).getOrElse("") sql"""INSERT INTO $tableName (base, types, shredding_started, shredding_completed, min_collector_tstamp, max_collector_tstamp, ingestion_tstamp, compression, processor_artifact, processor_version, count_good) - VALUES (${message.base}, $types, - ${Timestamp.from(message.timestamps.jobStarted)}, ${Timestamp.from( - message.timestamps.jobCompleted - )}, - ${message.timestamps.min.map(Timestamp.from)}, ${message.timestamps.max.map(Timestamp.from)}, - current_timestamp(), - ${message.compression.asString}, ${message.processor.artifact}, ${message - .processor - .version}, ${message.count})""" + VALUES (${message.base}, $types, $jobStarted, $jobCompleted, $minTstamp, $maxTstamp, current_timestamp(), + ${message.compression.asString}, ${message.processor.artifact}, ${message.processor.version}, ${message.count})""" case Statement.ManifestGet(base) => sql"""SELECT ingestion_tstamp, base, types, shredding_started, shredding_completed, diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala index adecf571c..535ad3439 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala @@ -168,6 +168,7 @@ object StorageTarget { props.put("warehouse", warehouse) props.put("db", database) props.put("application", appName) + props.put("timezone", "UTC") role.foreach(r => props.put("role", r)) props } diff --git a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala index 82299ac67..88d9e24b2 100644 --- a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala +++ b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala @@ -32,12 +32,8 @@ import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, Shredd import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable import doobie.Fragment import doobie.implicits._ -import doobie.implicits.javasql._ import io.circe.syntax._ -import java.sql.Timestamp - - object Snowflake { val EventFieldSeparator = Fragment.const0("\t") @@ -209,15 +205,16 @@ object Snowflake { case Statement.ManifestAdd(message) => val tableName = Fragment.const(qualify(Manifest.Name)) val types = Fragment.const0(s"parse_json('${message.types.asJson.noSpaces}')") + val jobStarted: String = message.timestamps.jobStarted.toString + val jobCompleted: String = message.timestamps.jobCompleted.toString + val minTstamp: String = message.timestamps.min.map(_.toString).getOrElse("") + val maxTstamp:String = message.timestamps.max.map(_.toString).getOrElse("") // Redshift JDBC doesn't accept java.time.Instant sql"""INSERT INTO $tableName (base, types, shredding_started, shredding_completed, min_collector_tstamp, max_collector_tstamp, ingestion_tstamp, compression, processor_artifact, processor_version, count_good) - SELECT ${message.base}, $types, - ${Timestamp.from(message.timestamps.jobStarted)}, ${Timestamp.from(message.timestamps.jobCompleted)}, - ${message.timestamps.min.map(Timestamp.from)}, ${message.timestamps.max.map(Timestamp.from)}, - getdate(), + SELECT ${message.base}, $types, $jobStarted, $jobCompleted, $minTstamp, $maxTstamp, sysdate(), ${message.compression.asString}, ${message.processor.artifact}, ${message.processor.version}, ${message.count}""" case Statement.ManifestGet(base) => sql"""SELECT ingestion_tstamp, diff --git a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/ast/SnowflakeDatatype.scala b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/ast/SnowflakeDatatype.scala index a335b0e14..d9084253f 100644 --- a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/ast/SnowflakeDatatype.scala +++ b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/ast/SnowflakeDatatype.scala @@ -33,7 +33,7 @@ object SnowflakeDatatype { } final case object Timestamp extends SnowflakeDatatype { - def toDdl: Fragment = fr0"TIMESTAMP" + def toDdl: Fragment = fr0"TIMESTAMP_NTZ" } final case class Char(size: Int) extends SnowflakeDatatype {