Skip to content

Commit

Permalink
RDB Loader: bump snowplow-scala-tracker to 0.5.0 (close #57)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Apr 29, 2018
1 parent 958b62f commit 4c45b2e
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 29 deletions.
3 changes: 0 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ lazy val loader = project.in(file("."))
libraryDependencies ++= Seq(
Dependencies.scopt,
Dependencies.scalaz7,
Dependencies.json4s,
Dependencies.igluClient,
Dependencies.igluCore,
Dependencies.scalaTracker,
Dependencies.catsFree,
Dependencies.circeYaml,
Expand Down Expand Up @@ -68,7 +66,6 @@ lazy val shredder = project.in(file("shredder"))
// Scala
Dependencies.sparkCore,
Dependencies.sparkSQL,
Dependencies.json4s,
Dependencies.scalaz7,
Dependencies.scopt,
Dependencies.commonEnrich,
Expand Down
6 changes: 1 addition & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ object Dependencies {
// Scala (Loader)
val scopt = "3.6.0"
val scalaz7 = "7.0.9"
val json4s = "3.2.11" // evicted by iglu-core with 3.3.0
val igluClient = "0.5.0"
val igluCore = "0.1.0"
val scalaTracker = "0.3.0"
val scalaTracker = "0.5.0"
val circeYaml = "0.7.0"
val circe = "0.9.3"
val cats = "1.1.0"
Expand Down Expand Up @@ -59,9 +57,7 @@ object Dependencies {
// Scala (Loader)
val scopt = "com.github.scopt" %% "scopt" % V.scopt
val scalaz7 = "org.scalaz" %% "scalaz-core" % V.scalaz7
val json4s = "org.json4s" %% "json4s-jackson" % V.json4s
val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % V.igluClient
val igluCore = "com.snowplowanalytics" %% "iglu-core" % V.igluCore intransitive()
val scalaTracker = "com.snowplowanalytics" %% "snowplow-scala-tracker" % V.scalaTracker
val manifest = "com.snowplowanalytics" %% "snowplow-processing-manifest" % V.manifest
val cats = "org.typelevel" %% "cats" % V.cats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,20 @@ object ManifestDiscovery {
val version = Semver.decodeSemver(record.application.version).toValidatedNel
val types = record.payload.flatMap(RdbPayload.ShreddedTypesGet).map(_.shreddedTypes).getOrElse(Set.empty)
val schemaKeys = types.toList.traverse { t => SchemaKey.fromUri(t) match {
case Some(ss) => ss.validNel[String]
case None => s"Key [$t] is invalid Iglu URI".invalidNel[SchemaKey]
case Some(ss) if ss.version.getModel.isEmpty => s"No support for partial schema: ${ss.toSchemaUri}".invalidNel[(SchemaKey, Int)]
case Some(ss) => (ss, ss.version.getModel.get).validNel[String]
case None => s"Key [$t] is invalid Iglu URI".invalidNel[(SchemaKey, Int)]
}}

val base = S3.Folder
.parse(record.itemId)
.leftMap(message => s"Path [${record.itemId}] is not valid base for shredded type. $message")
.toValidatedNel

(version, schemaKeys, base).mapN { (v: Semver, k: List[SchemaKey], b: S3.Folder) =>
k.map(kk => ShreddedType.Info(b, kk.vendor, kk.name, kk.version.model, v))
(version, schemaKeys, base).mapN { (v: Semver, k: List[(SchemaKey, Int)], b: S3.Folder) =>
k.map { case (keyInfo, model) =>
ShreddedType.Info(b, keyInfo.vendor, keyInfo.name, model, v)
}
} match {
case Validated.Valid(infos) => infos.distinct.asRight
case Validated.Invalid(errors) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,12 @@ import cats.free.Free
import cats.implicits._

import com.snowplowanalytics.iglu.client.SchemaCriterion

import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}

import com.snowplowanalytics.snowplow.rdbloader.LoaderError._
import com.snowplowanalytics.snowplow.rdbloader.config.Semver
import com.snowplowanalytics.snowplow.rdbloader.utils.Common.toSnakeCase

// This project


/**
* Container for S3 folder with shredded JSONs ready to load
* Usually it represents self-describing event or custom/derived context
Expand Down Expand Up @@ -214,11 +210,11 @@ object ShreddedType {
val (bucket, path) = S3.splitS3Key(key)
val (subpath, shredpath) = splitFilpath(path, shredJob)
extractSchemaKey(shredpath, shredJob) match {
case Some(schemaKey) =>
case Some(SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _))) =>
val prefix = S3.Folder.coerce("s3://" + bucket + "/" + subpath)
val result = Info(prefix, schemaKey.vendor, schemaKey.name, schemaKey.version.model, shredJob)
val result = Info(prefix, vendor, name, model, shredJob)
result.asRight
case None =>
case _ =>
ShreddedTypeKeyFailure(key).asLeft
}
}
Expand All @@ -233,14 +229,17 @@ object ShreddedType {
* @param shredJob shred job version to decide what format should be present
* @return valid schema key if found
*/
def extractSchemaKey(subpath: String, shredJob: Semver): Option[SchemaKey] =
if (shredJob <= ShredJobBeforeSparkVersion) SchemaKey.fromPath(subpath)
else subpath match {
def extractSchemaKey(subpath: String, shredJob: Semver): Option[SchemaKey] = {
if (shredJob <= ShredJobBeforeSparkVersion) {
val uri = "iglu:" + subpath
SchemaKey.fromUri(uri)
} else subpath match {
case ShreddedSubpathPattern(vendor, name, format, version) =>
val igluPath = s"$vendor/$name/$format/$version"
SchemaKey.fromPath(igluPath)
val uri = s"iglu:$vendor/$name/$format/$version"
SchemaKey.fromUri(uri)
case _ => None
}
}

/**
* Split S3 filepath (without bucket name) into subpath and shreddedpath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.joda.time.DateTime
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}

import com.snowplowanalytics.snowplow.scalatracker._
import com.snowplowanalytics.snowplow.scalatracker.emitters.TEmitter._
import com.snowplowanalytics.snowplow.scalatracker.emitters.{AsyncBatchEmitter, AsyncEmitter}

// This project
Expand Down Expand Up @@ -71,11 +72,11 @@ object TrackerInterpreter {
case Some(Collector((host, port))) =>
val emitter = monitoring.snowplow.flatMap(_.method) match {
case Some(GetMethod) =>
AsyncEmitter.createAndStart(host, port = port)
AsyncEmitter.createAndStart(host, port = Some(port), callback = Some(callback))
case Some(PostMethod) =>
AsyncBatchEmitter.createAndStart(host, port = port, bufferSize = 2)
AsyncBatchEmitter.createAndStart(host, port = Some(port), bufferSize = 2)
case None =>
AsyncEmitter.createAndStart(host, port = port)
AsyncEmitter.createAndStart(host, port = Some(port), callback = Some(callback))
}
val tracker = new Tracker(List(emitter), "snowplow-rdb-loader", monitoring.snowplow.flatMap(_.appId).getOrElse("rdb-loader"))
Some(tracker)
Expand Down Expand Up @@ -103,7 +104,7 @@ object TrackerInterpreter {
*/
def trackSuccess(tracker: Option[Tracker]): Unit = tracker match {
case Some(t) =>
t.trackUnstructEvent(SelfDescribingJson(LoadSucceededSchema, JObject(Nil)))
t.trackSelfDescribingEvent(SelfDescribingData(LoadSucceededSchema, JObject(Nil)))
case None => ()
}

Expand Down

0 comments on commit 4c45b2e

Please sign in to comment.