Skip to content

Commit

Permalink
RDB Loader: bump snowplow-scala-tracker to 0.6.1 (close #148)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Apr 23, 2019
1 parent c1d9efa commit 009e723
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 31 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ lazy val loader = project.in(file("."))
Dependencies.igluClient,
Dependencies.igluCoreCirce,
Dependencies.scalaTracker,
Dependencies.scalaTrackerEmit,
Dependencies.catsFree,
Dependencies.circeYaml,
Dependencies.circeGeneric,
Expand Down
29 changes: 15 additions & 14 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object Dependencies {
val decline = "0.6.2"
val igluClient = "0.6.0-M6"
val igluCore = "0.5.0-M1"
val scalaTracker = "0.5.0"
val scalaTracker = "0.6.1"
val circeYaml = "0.9.0"
val circe = "0.11.1"
val cats = "1.6.0"
Expand All @@ -29,7 +29,7 @@ object Dependencies {
// Scala (Shredder)
val analyticsSdk = "0.4.2-M1"
val spark = "2.2.0"
val eventsManifest = "0.2.0-M2"
val eventsManifest = "0.2.0"
val schemaDdl = "0.9.0"

// Java (Loader)
Expand All @@ -52,18 +52,19 @@ object Dependencies {
)

// Scala (Loader)
val decline = "com.monovore" %% "decline" % V.decline
val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % V.igluClient
val scalaTracker = "com.snowplowanalytics" %% "snowplow-scala-tracker" % V.scalaTracker
val manifest = "com.snowplowanalytics" %% "snowplow-processing-manifest" % V.manifest
val igluCoreCirce = "com.snowplowanalytics" %% "iglu-core-circe" % V.igluCore
val cats = "org.typelevel" %% "cats" % V.cats
val catsFree = "org.typelevel" %% "cats-free" % V.cats
val circeCore = "io.circe" %% "circe-core" % V.circe
val circeGeneric = "io.circe" %% "circe-generic" % V.circe
val circeGenericExtra = "io.circe" %% "circe-generic-extras" % V.circe
val circeYaml = "io.circe" %% "circe-yaml" % V.circeYaml
val fs2 = "co.fs2" %% "fs2-core" % V.fs2
val decline = "com.monovore" %% "decline" % V.decline
val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % V.igluClient
val scalaTracker = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.scalaTracker
val scalaTrackerEmit = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-id" % V.scalaTracker
val manifest = "com.snowplowanalytics" %% "snowplow-processing-manifest" % V.manifest
val igluCoreCirce = "com.snowplowanalytics" %% "iglu-core-circe" % V.igluCore
val cats = "org.typelevel" %% "cats" % V.cats
val catsFree = "org.typelevel" %% "cats-free" % V.cats
val circeCore = "io.circe" %% "circe-core" % V.circe
val circeGeneric = "io.circe" %% "circe-generic" % V.circe
val circeGenericExtra = "io.circe" %% "circe-generic-extras" % V.circe
val circeYaml = "io.circe" %% "circe-yaml" % V.circeYaml
val fs2 = "co.fs2" %% "fs2-core" % V.fs2

// Scala (Shredder)
val analyticsSdk = "com.snowplowanalytics" %% "snowplow-scala-analytics-sdk" % V.analyticsSdk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import implementations.ManifestInterpreter.ManifestE
class DryRunInterpreter private[interpreters](
cliConfig: CliConfig,
amazonS3: AmazonS3,
tracker: Option[Tracker],
tracker: Option[Tracker[Id]],
resolver: Client[Id, Json]) extends Interpreter {

private val logQueries = ListBuffer.empty[SqlString]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import com.snowplowanalytics.snowplow.rdbloader.loaders.Common.SqlString
class RealWorldInterpreter private[interpreters](
cliConfig: CliConfig,
amazonS3: AmazonS3,
tracker: Option[Tracker],
tracker: Option[Tracker[Id]],
resolver: Client[Id, Json]) extends Interpreter {

private val interpreter = this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,28 @@ import java.nio.charset.StandardCharsets

import scala.util.control.NonFatal

import cats.Id
import cats.data.NonEmptyList

import io.circe.Json

import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.ObjectMetadata

import org.json4s.JObject

import org.joda.time.DateTime

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}

import com.snowplowanalytics.snowplow.scalatracker._
import com.snowplowanalytics.snowplow.scalatracker.emitters.TEmitter._
import com.snowplowanalytics.snowplow.scalatracker.emitters.{AsyncBatchEmitter, AsyncEmitter}
import com.snowplowanalytics.snowplow.scalatracker.emitters.id._
import com.snowplowanalytics.snowplow.scalatracker.emitters.id.RequestProcessor._
import com.snowplowanalytics.snowplow.scalatracker.emitters.id.{SyncBatchEmitter, SyncEmitter}

// This project
import config.SnowplowConfig.{GetMethod, Monitoring, PostMethod}

object TrackerInterpreter {

import scala.concurrent.ExecutionContext.Implicits.global

val ApplicationContextSchema = SchemaKey("com.snowplowanalytics.monitoring.batch", "application_context", "jsonschema", SchemaVer.Full(1,0,0))
val LoadSucceededSchema = SchemaKey("com.snowplowanalytics.monitoring.batch", "load_succeeded", "jsonschema", SchemaVer.Full(1,0,0))
val LoadFailedSchema = SchemaKey("com.snowplowanalytics.monitoring.batch", "load_failed", "jsonschema", SchemaVer.Full(1,0,0))
Expand Down Expand Up @@ -67,18 +69,18 @@ object TrackerInterpreter {
* @param monitoring config.yml `monitoring` section
* @return some tracker if enabled, none otherwise
*/
def initializeTracking(monitoring: Monitoring): Option[Tracker] = {
def initializeTracking(monitoring: Monitoring): Option[Tracker[Id]] = {
monitoring.snowplow.flatMap(_.collector) match {
case Some(Collector((host, port))) =>
val emitter = monitoring.snowplow.flatMap(_.method) match {
case Some(GetMethod) =>
AsyncEmitter.createAndStart(host, port = Some(port), callback = Some(callback))
SyncEmitter.createAndStart(host, port = Some(port), callback = Some(callback))
case Some(PostMethod) =>
AsyncBatchEmitter.createAndStart(host, port = Some(port), bufferSize = 2)
SyncBatchEmitter.createAndStart(host, port = Some(port), bufferSize = 2)
case None =>
AsyncEmitter.createAndStart(host, port = Some(port), callback = Some(callback))
SyncEmitter.createAndStart(host, port = Some(port), callback = Some(callback))
}
val tracker = new Tracker(List(emitter), "snowplow-rdb-loader", monitoring.snowplow.flatMap(_.appId).getOrElse("rdb-loader"))
val tracker = new Tracker[Id](NonEmptyList.of(emitter), "snowplow-rdb-loader", monitoring.snowplow.flatMap(_.appId).getOrElse("rdb-loader"))
Some(tracker)
case Some(_) => None
case None => None
Expand All @@ -90,9 +92,9 @@ object TrackerInterpreter {
*
* @param tracker some tracker if enabled
*/
def trackError(tracker: Option[Tracker]): Unit = tracker match {
def trackError(tracker: Option[Tracker[Id]]): Unit = tracker match {
case Some(t) =>
t.trackSelfDescribingEvent(SelfDescribingData(LoadFailedSchema, JObject(Nil)))
t.trackSelfDescribingEvent(SelfDescribingData(LoadFailedSchema, Json.fromFields(List.empty)))
case None => ()
}

Expand All @@ -101,9 +103,9 @@ object TrackerInterpreter {
*
* @param tracker some tracker if enabled
*/
def trackSuccess(tracker: Option[Tracker]): Unit = tracker match {
def trackSuccess(tracker: Option[Tracker[Id]]): Unit = tracker match {
case Some(t) =>
t.trackSelfDescribingEvent(SelfDescribingData(LoadSucceededSchema, JObject(Nil)))
t.trackSelfDescribingEvent(SelfDescribingData(LoadSucceededSchema, Json.fromFields(List.empty)))
case None => ()
}

Expand Down

0 comments on commit 009e723

Please sign in to comment.