Skip to content

Commit

Permalink
Merge branch 'release/r28'
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Nov 13, 2017
2 parents 15caf98 + 66e2aff commit 2343b1d
Show file tree
Hide file tree
Showing 34 changed files with 909 additions and 209 deletions.
19 changes: 13 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,22 @@ scala:
jdk:
- oraclejdk8
before_deploy: pip install --user release-manager==0.3.0
deploy:
- provider: script
script: release-manager --config .travis/release.yml --make-artifact --upload-artifact --check-version
skip_cleanup: true
on:
tags: true
env:
global:
- secure: my2pgROi/Xut3sVM47URmR+A/e29Z+pW3dtcHfqJHn3c6CP1NaSSKM+6e3PdM7BYNl9HgT0zUYfnw4lstIi9nCP0vumcA4zbx+zOd9brV10/jhpO/RtMsSQB1ycpGffJP5z+sc6AzK11ECkOoXlGHZNx48Lf9SaDTL79tMn5MQOiJphy+4OKYBiyZEdiPX6C45uvItG9zMN6RhlVpryHzRwdeiqykdFUCd/SeKAiXnAnNFPUyEYAlKz9P3setkZKVzp/yKBNVK41W1myePOWB0Is0MSisOapf1MD/k+tXovH9Ilo+K+HZId5hMpGQMo5tCvb2N2xr7j+VsNwcW3kjgROH/0QfJD9l5pB7P67iKv0JPrOjGpI2DgO7o4/YspAYUferWEeg3JboF+eytSJgXkDF6y4Ll5xQEhEXF5VDZfKamg2MOQBnRa13egn9oom5KcQ/7UrxyirRUFQI04fp3M8hGuYjxIy98t3/7Zw448ayj+oJd0miz449/3iz/YZfuaqlTFHxc22DL9BG9hX7ygOitDDoacGEeK0Z5iL7samU8GI+V5aJP23SaV+t1gvutCEU4NwRGa+ZTPmEt4RInEko0kF69gtGEHq026vjvBGW8uKFcyZcrq1ZJTcRHo3oj2L3Ub1KKuyTxs78Jd3dAI59DSG5ZSkACcuVt5gQdI=
- secure: Q/H5Mcb1PxmpyJ8wgCWgnof098r3OA9iVqQiySb+TQSnEjv1FKfvzMU1X3HK2TePwG5SPI4QoXygsXnDuo2lV4cPtofwULDvtNoqd8ii2ojFHbp6nvSrD7fxSVq0Vr20FfavrWzqN9/dB8m7/vyuLl2DGTsSFNMrB40Gt1OfoYFvnah3RvyhFYw0WNluPp3n4+l1zd4I64WJKU60zP2mvzRaNzXPfvXf9igGbC9vtIxaaVb9jRr7DtQrk3wqxdjY3t5Zr6ZDYtqFeUkOCskIb0ghL4Idegm7D8MMKemz8N+ZMZPQgJkGMZQj4gxBwFH0YtROnx1y4O8c21CwKA8LVg1H2n7nrwEHyjaZIwCbZbT8eoYHyyvTeXhhMFTsk/GCksMWvV4K9SuvSsDz33yDPsopdEdwfGJ6F4wqhb5MezjSlktaGeAEGsp04H0AXjprTumCboodYAgLz3CZuv0OtMyxHyH+V2q2hLnhJiWC5ue0WsWLx1WzfzIhHIxUvVqtY4fHyZW3OBycvbvDm/Auxj8xkWAXzBP315vJSyFLHkde3VThPnQC17zdN4JBcyeGXVNd7ZbIvoEQfjKOUmaW1plFIZtQPyqfx6vXWak/6ICI6hX+lAgSBjMHkvv/4lGrbgQOrM+VzkfRBkVb3Ybi+IDBkE5wodAntB5KQG9GsSA=
- secure: MJp/yUl9TViL4mCMu5v2gNuxvrfG5/atcOMjNchtvs3fzY9RNEMp0d0Qj2HVzSdTY0HizWlprc9vOWtiMeoDnGaMNbPQSxNoujnTq+xK4U6HycFuD1iGGTr/Zf5NfGAmQP9ty9CgHZJkXKrKHIlw2fJV18vq82fykiOkb+lFXhrSd373gO6tmG9AQsfd/+DTTpGVE9yblfFyqa8Zed59Qrr9ZC2/qlJwouFKDjIOc/iU0rwFR+MaxiaezLIOyPNT/K/zXyoiNQwr3uMv3EZqvO1yv/ymW3y6sk1IqyWzgP9fjFllHdhGOPLoUMSx898NihaCeXUsMZeFEFQ95sgevPiRpOJ47qd1Qimx8+o8Q0kTqU9evY9wo+Rsd8Odqx1+MIPabk1xzDB4BvDMqsHQfRx0CJT82sfW+fXxoHNTFU7I7jJBBL5T7GdntXSMxFZuYL34TFTiEkUBz5e/k6oY5c987iU4EX5Jbgq9sWbvJ6UTLqemMBMIR4TMaYlWNpXp5bHnsaBQqlS1VzkF/PWnx8edVVvfk7CW27b9asnEvBYeM62XibSihGbRqydv8QU3OFjH3mq4bZAOc/ePlnky3OOCiXKhh2PJusQpZa6PrRsDE+/4q7jkebLiC16S9Dah1FZeqzGJDXMabH4WGG4BdDflgrYueMKA8HnH2rtLvMA=
- secure: sP7R6sZqekVEeAt/NfgTO9kLZ+tpWrVCRWJJkulh2cW6flPkGqnSH+5HHa8dj00ZNojl7NZqyIwnmBOCfkYS+ncIpXNvHah2DLEuwZqxXtZhLU2mZg6DKGBaCfArhtXRf6Y8FQ+e9unZi9A5yqT7HScaABeWglYCJIIZ6o0tFL9MB0Zx6b1bHDAgrsvFCqUeyWvt7lEQ8fHi/pgHaVuXAmyO2bYKgY2Vwb6bILUTtFMSvknfIXPzdUNQeOalGXUjHGtsSvyuKZbR8kOhnAn9fjgNtxIyUT+u1BUY0HfuYH8653o8XzQMdifja9C0LPAoJ+eaCBQOXV/ZxoXr1/deNPZfLsaIXtmmzXpHbniWDYuM69zvrZM0y+VsuhIOCKVtllvyvd3EGNwjIP76HYTYxCeGh0p3R7Vde2BgsBBf45VkGTrNKA7sOyZ8JilaGnlEfB7ujyZ6L1QXd1ihRryuoZ8iXzngxobKmqC9m+uCyeC6+cvU2jeNVTkYfoVaRW3zw6RgSDLG5mJfZPREwOBCRQXms6ifzgj7HFw1W5FdlHrM+EyvTznnXWk1OGFeIVXezqtCIq8d9TpRQbG1ezlK0y/TYasidUrJUo2tri4LFTkkGyqUSQxNU+Tkjm18nUUjlphPLMANO6uuD1E6mouG7nQOo/ipR/46jUCPCgcrl20=
deploy:
- provider: script
script: ./.travis/deploy.sh rdb_loader $TRAVIS_TAG
skip_cleanup: true
on:
tags: true
condition: '"$(.travis/is_release_tag.sh rdb_loader $TRAVIS_TAG)" == "" && $? == 0'
- provider: script
script: ./.travis/deploy.sh rdb_shredder $TRAVIS_TAG
skip_cleanup: true
on:
tags: true
condition: '"$(.travis/is_release_tag.sh rdb_shredder $TRAVIS_TAG)" == "" && $? == 0'
30 changes: 30 additions & 0 deletions .travis/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash

project=$1
tag=$2

slashed="${project}/"
slashed_len=${#slashed}

cicd=${tag:0:${slashed_len}}
release=${tag:${slashed_len}}

if [ "${cicd}" == "${slashed}" ]; then
if [ "${release}" == "" ]; then
echo "WARNING! No release specified! Ignoring."
exit 2
fi
else
echo "This can't be deployed - there's no ${project} tag! (Is the travis condition set?)"
exit 1
fi

cd "${TRAVIS_BUILD_DIR}"

export TRAVIS_BUILD_RELEASE_TAG="${release}"
release-manager \
--config "./.travis/release_${project}.yml" \
--check-version \
--make-version \
--make-artifact \
--upload-artifact
20 changes: 20 additions & 0 deletions .travis/is_release_tag.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

project=$1
tag=$2

slashed="${project}/"
slashed_len=${#slashed}

cicd=${tag:0:${slashed_len}}
release=${tag:${slashed_len}}

if [ "${cicd}" == "${slashed}" ]; then
if [ "${release}" == "" ]; then
echo "Warning! No release specified! Ignoring."
exit 2
fi
exit 0
else
exit 1
fi
2 changes: 1 addition & 1 deletion .travis/release.yml → .travis/release_rdb_loader.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ packages:
version : <%= FUNC['sbt_version(.)'] %>

# Required IF '--check-version' is passed: will assert that both versions are the same
build_version : <%= ENV['TRAVIS_TAG'] %>
build_version : <%= ENV['TRAVIS_BUILD_RELEASE_TAG'] %>

# Optional: Build commands
build_commands:
Expand Down
79 changes: 79 additions & 0 deletions .travis/release_rdb_shredder.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
---
# --- Variables --- #

local:
root_dir : <%= ENV['TRAVIS_BUILD_DIR'] %>

# --- Release Manager Config --- #

# Required: deployment targets
targets:
- type : "awss3"
access_key_id : <%= ENV['AWS_ACCESS_KEY'] %>
secret_access_key : <%= ENV['AWS_SECRET_KEY'] %>

# Required: packages to be deployed
packages:
- name : "rdb-shredder-hosted-asset"
locations:
- bucket : "snowplow-hosted-assets"
path : "4-storage/rdb-shredder"
region : "eu-west-1"
- bucket : "snowplow-hosted-assets-us-east-1"
path : "4-storage/rdb-shredder"
region : "us-east-1"
- bucket : "snowplow-hosted-assets-us-west-1"
path : "4-storage/rdb-shredder"
region : "us-west-1"
- bucket : "snowplow-hosted-assets-us-west-2"
path : "4-storage/rdb-shredder"
region : "us-west-2"
- bucket : "snowplow-hosted-assets-sa-east-1"
path : "4-storage/rdb-shredder"
region : "sa-east-1"
- bucket : "snowplow-hosted-assets-eu-central-1"
path : "4-storage/rdb-shredder"
region : "eu-central-1"
- bucket : "snowplow-hosted-assets-ap-southeast-1"
path : "4-storage/rdb-shredder"
region : "ap-southeast-1"
- bucket : "snowplow-hosted-assets-ap-southeast-2"
path : "4-storage/rdb-shredder"
region : "ap-southeast-2"
- bucket : "snowplow-hosted-assets-ap-northeast-1"
path : "4-storage/rdb-shredder"
region : "ap-northeast-1"
- bucket : "snowplow-hosted-assets-ap-south-1"
path : "4-storage/rdb-shredder"
region : "ap-south-1"
- bucket : "snowplow-hosted-assets-us-east-2"
path : "4-storage/rdb-shredder"
region : "us-east-2"
- bucket : "snowplow-hosted-assets-ca-central-1"
path : "4-storage/rdb-shredder"
region : "ca-central-1"
- bucket : "snowplow-hosted-assets-eu-west-2"
path : "4-storage/rdb-shredder"
region : "eu-west-2"
- bucket : "snowplow-hosted-assets-ap-northeast-2"
path : "4-storage/rdb-shredder"
region : "ap-northeast-2"
publish : true
override : false
continue_on_conflict : false
version : <%= CMD['sbt "project shredder" version -Dsbt.log.noformat=true | awk "END{print}" | sed -r "s/\[info\]\s(.*)/\1/" | xargs echo -n'] %>

# Required IF '--check-version' is passed: will assert that both versions are the same
build_version : <%= ENV['TRAVIS_BUILD_RELEASE_TAG'] %>

# Optional: Build commands
build_commands:
- sbt "project shredder" assembly

# Required: Artifact. Version will be inserted between prefix and suffix
artifacts:
- type : "asis"
prefix : "snowplow-rdb-shredder-"
suffix : ".jar"
binary_paths:
- "shredder/target/scala-2.11/snowplow-rdb-shredder-{{ packages.0.build_version }}.jar"
20 changes: 20 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
Release 28 (2017-11-13)
-----------------------
Common: add CI/CD (#55)
Common: remove AWS Java SDK shading (#54)
RDB Shredder: add Snowplow and Clojars resolvers (#56)
RDB Shredder: bump Spark to 2.2.0 (#52)
RDB Shredder: bump to 0.13.0 (#49)
RDB Shredder: bump scala-common-enrich to 0.27.0 (#39)
RDB Shredder: overwrite output datasets (#41)
RDB Loader: bump sbt-assembly to 0.14.5 (#51)
RDB Loader: bump SBT to 0.13.16 (#50)
RDB Loader: allow JDBC credentials to be stored in EC2 parameter store (#19)
RDB Loader: add support for SSH tunnels (#22)
RDB Loader: bump AWS SDK to 1.11.208 (#48)
RDB Loader: bump redshift-jdbc to 1.2.8.1005 (#40)
RDB Loader: make loading shredded data always required (#29)
RDB Loader: remove tracking from dry run (#42)
RDB Loader: execute manifest insert in same transaction as load (#36)
RDB Loader: make logkey optional (#35)

Version 0.13.0 (2017-09-06)
---------------------------
Common: migrate CHANGELOG from snowplow/snowplow (#24)
Expand Down
7 changes: 5 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
lazy val loader = project.in(file("."))
.settings(
name := "snowplow-rdb-loader",
version := "0.13.0",
version := "0.14.0",
initialCommands := "import com.snowplowanalytics.snowplow.rdbloader._",
mainClass in Compile := Some("com.snowplowanalytics.snowplow.rdbloader.Main")
)
Expand All @@ -37,7 +37,10 @@ lazy val loader = project.in(file("."))

Dependencies.postgres,
Dependencies.redshift,
Dependencies.redshiftSdk,
Dependencies.s3,
Dependencies.ssm,
Dependencies.jSch,

Dependencies.specs2,
Dependencies.specs2ScalaCheck,
Expand All @@ -48,7 +51,7 @@ lazy val loader = project.in(file("."))
lazy val shredder = project.in(file("shredder"))
.settings(
name := "snowplow-rdb-shredder",
version := "0.12.0",
version := "0.13.0",
description := "Spark job to shred event and context JSONs from Snowplow enriched events",
BuildSettings.oneJvmPerTestSetting // ensures that only CrossBatchDeduplicationSpec has a DuplicateStorage
)
Expand Down
16 changes: 8 additions & 8 deletions project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ object BuildSettings {

lazy val assemblySettings = Seq(
jarName,

assemblyShadeRules in assembly := Seq(
ShadeRule.rename(
// EMR has 0.1.42 installed
"com.jcraft.jsch.**" -> "shadejsch.@1"
).inAll
),

assemblyMergeStrategy in assembly := {
case PathList("META-INF", _ @ _*) => MergeStrategy.discard
case PathList("reference.conf", _ @ _*) => MergeStrategy.concat
Expand All @@ -73,14 +81,6 @@ object BuildSettings {

lazy val shredderAssemblySettings = Seq(
jarName,
// Slightly cleaner jar name
// For AMI 4.5.0, could be removed in future versions
assemblyShadeRules in assembly := Seq(
ShadeRule.rename(
"com.amazonaws.**" -> "shadeaws.@1",
"org.apache.http.**" -> "shadehttp.@1"
).inAll
),
// Drop these jars
assemblyExcludedJars in assembly := {
val cp = (fullClasspath in assembly).value
Expand Down
24 changes: 16 additions & 8 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ object Dependencies {
val cats = "0.9.0"

// Scala (Shredder)
val spark = "2.1.0"
val commonEnrich = "0.25.0"
val spark = "2.2.0"
val commonEnrich = "0.27.0"

// Java (Loader)
val postgres = "42.0.0"
val redshift = "1.2.1.1001"
val aws = "1.10.77"
val redshift = "1.2.8.1005"
val aws = "1.11.208"
val jSch = "0.1.54"

// Java (Shredder)
val dynamodb = "1.11.98"
Expand All @@ -47,7 +48,11 @@ object Dependencies {
// For specs2
"scalaz-bintray" at "http://dl.bintray.com/scalaz/releases",
// Redshift native driver
"redshift" at "http://redshift-maven-repository.s3-website-us-east-1.amazonaws.com/release"
"redshift" at "http://redshift-maven-repository.s3-website-us-east-1.amazonaws.com/release",
// For Snowplow libs (SCE transient)
"Snowplow Analytics Maven repo" at "http://maven.snplow.com/releases/",
// For uaParser utils (SCE transient)
"user-agent-parser repo" at "https://clojars.org/repo/"
)

// Scala (Loader)
Expand All @@ -70,9 +75,12 @@ object Dependencies {
val sparkSQL = "org.apache.spark" %% "spark-sql" % V.spark % "provided"

// Java (Loader)
val postgres = "org.postgresql" % "postgresql" % V.postgres
val redshift = "com.amazon.redshift" % "redshift-jdbc42" % V.redshift
val s3 = "com.amazonaws" % "aws-java-sdk-s3" % V.aws
val postgres = "org.postgresql" % "postgresql" % V.postgres
val redshift = "com.amazon.redshift" % "redshift-jdbc42-no-awssdk" % V.redshift
val redshiftSdk = "com.amazonaws" % "aws-java-sdk-redshift" % V.aws
val s3 = "com.amazonaws" % "aws-java-sdk-s3" % V.aws
val ssm = "com.amazonaws" % "aws-java-sdk-ssm" % V.aws
val jSch = "com.jcraft" % "jsch" % V.jSch

// Java (Shredder)
val dynamodb = "com.amazonaws" % "aws-java-sdk-dynamodb" % V.dynamodb
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.15
sbt.version=0.13.16
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
logLevel := Level.Warn

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import com.fasterxml.jackson.databind.JsonNode
// Spark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

// Scalaz
import scalaz._
Expand Down Expand Up @@ -64,7 +65,8 @@ object ShredJob extends SparkJob {
classOf[com.fasterxml.jackson.databind.node.ArrayNode],
classOf[com.fasterxml.jackson.databind.node.NullNode],
classOf[com.fasterxml.jackson.databind.node.JsonNodeFactory],
classOf[org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage]
classOf[org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage],
classOf[org.apache.spark.sql.execution.datasources.FileFormatWriter$WriteTaskResult]
)
override def sparkConfig(): SparkConf = new SparkConf()
.setAppName(getClass().getSimpleName())
Expand Down Expand Up @@ -345,7 +347,7 @@ class ShredJob(@transient val spark: SparkSession, args: Array[String]) extends
// Handling of malformed rows
val bad = common
.flatMap { case (line, shredded) => projectBads(line, shredded) }
.map { case (line, errors) => new BadRow(line, errors).toCompactJson }
.map { case (line, errors) => Row(BadRow(line, errors).toCompactJson) }

// Handling of properly-formed rows, only one event from an event id and event fingerprint
// combination is kept
Expand Down Expand Up @@ -393,10 +395,13 @@ class ShredJob(@transient val spark: SparkSession, args: Array[String]) extends
// Write errors unioned with errors occurred during cross-batch deduplication
val badDupes = bad ++ dupeFailed
.flatMap { case (s, dupe) => dupe match {
case Failure(m) => Some(new BadRow(s.originalLine, m).toCompactJson)
case Failure(m) => Some(Row(BadRow(s.originalLine, m).toCompactJson))
case _ => None
}}
badDupes.saveAsTextFile(shredConfig.badFolder)
} }
spark.createDataFrame(badDupes, StructType(StructField("_", StringType, true) :: Nil))
.write
.mode(SaveMode.Overwrite)
.text(shredConfig.badFolder)

// Create duplicate JSON contexts for well-formed events having duplicates
// This creates not canonical contexts (with schema and data), but shredded hierarchies
Expand All @@ -407,8 +412,11 @@ class ShredJob(@transient val spark: SparkSession, args: Array[String]) extends

// Ready the events for database load
val events = goodWithSyntheticDupes
.map(e => alterEnrichedEvent(e.shredded.originalLine, e.newEventId))
events.saveAsTextFile(getAlteredEnrichedOutputPath(shredConfig.outFolder))
.map(e => Row(alterEnrichedEvent(e.shredded.originalLine, e.newEventId)))
spark.createDataFrame(events, StructType(StructField("_", StringType, true) :: Nil))
.write
.mode(SaveMode.Overwrite)
.text(getAlteredEnrichedOutputPath(shredConfig.outFolder))

// Update the shredded JSONs with the new deduplicated event IDs and stringify
val jsons = (goodWithSyntheticDupes
Expand Down
Loading

0 comments on commit 2343b1d

Please sign in to comment.