diff --git a/build.sbt b/build.sbt index 960fe20e6..bfd859386 100755 --- a/build.sbt +++ b/build.sbt @@ -26,9 +26,8 @@ lazy val loader = project.in(file(".")) libraryDependencies ++= Seq( Dependencies.scopt, Dependencies.scalaz7, - Dependencies.json4s, Dependencies.igluClient, - Dependencies.igluCore, + Dependencies.igluCoreCirce, Dependencies.scalaTracker, Dependencies.catsFree, Dependencies.circeYaml, @@ -68,11 +67,11 @@ lazy val shredder = project.in(file("shredder")) // Scala Dependencies.sparkCore, Dependencies.sparkSQL, - Dependencies.json4s, Dependencies.scalaz7, Dependencies.scopt, Dependencies.commonEnrich, Dependencies.igluClient, + Dependencies.igluCoreCirce, Dependencies.manifest, // Scala (test only) Dependencies.specs2 diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 5f6d85105..6a39b925a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,10 +17,9 @@ object Dependencies { // Scala (Loader) val scopt = "3.6.0" val scalaz7 = "7.0.9" - val json4s = "3.2.11" // evicted by iglu-core with 3.3.0 val igluClient = "0.5.0" - val igluCore = "0.1.0" - val scalaTracker = "0.3.0" + val igluCore = "0.2.0" + val scalaTracker = "0.5.0" val circeYaml = "0.7.0" val circe = "0.9.3" val cats = "1.1.0" @@ -59,11 +58,10 @@ object Dependencies { // Scala (Loader) val scopt = "com.github.scopt" %% "scopt" % V.scopt val scalaz7 = "org.scalaz" %% "scalaz-core" % V.scalaz7 - val json4s = "org.json4s" %% "json4s-jackson" % V.json4s val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % V.igluClient - val igluCore = "com.snowplowanalytics" %% "iglu-core" % V.igluCore intransitive() val scalaTracker = "com.snowplowanalytics" %% "snowplow-scala-tracker" % V.scalaTracker val manifest = "com.snowplowanalytics" %% "snowplow-processing-manifest" % V.manifest + val igluCoreCirce = "com.snowplowanalytics" %% "iglu-core-circe" % V.igluCore val cats = "org.typelevel" %% "cats" % V.cats val catsFree = "org.typelevel" %% "cats-free" % V.cats val circeCore = "io.circe" %% "circe-core" % V.circe diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscovery.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscovery.scala index fed28b350..8f8711c98 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscovery.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ManifestDiscovery.scala @@ -151,8 +151,9 @@ object ManifestDiscovery { val version = Semver.decodeSemver(record.application.version).toValidatedNel val types = record.payload.flatMap(RdbPayload.ShreddedTypesGet).map(_.shreddedTypes).getOrElse(Set.empty) val schemaKeys = types.toList.traverse { t => SchemaKey.fromUri(t) match { - case Some(ss) => ss.validNel[String] - case None => s"Key [$t] is invalid Iglu URI".invalidNel[SchemaKey] + case Some(ss) if ss.version.getModel.isEmpty => s"No support for partial schema: ${ss.toSchemaUri}".invalidNel[(SchemaKey, Int)] + case Some(ss) => (ss, ss.version.getModel.get).validNel[String] + case None => s"Key [$t] is invalid Iglu URI".invalidNel[(SchemaKey, Int)] }} val base = S3.Folder @@ -160,8 +161,10 @@ object ManifestDiscovery { .leftMap(message => s"Path [${record.itemId}] is not valid base for shredded type. $message") .toValidatedNel - (version, schemaKeys, base).mapN { (v: Semver, k: List[SchemaKey], b: S3.Folder) => - k.map(kk => ShreddedType.Info(b, kk.vendor, kk.name, kk.version.model, v)) + (version, schemaKeys, base).mapN { (v: Semver, k: List[(SchemaKey, Int)], b: S3.Folder) => + k.map { case (keyInfo, model) => + ShreddedType.Info(b, keyInfo.vendor, keyInfo.name, model, v) + } } match { case Validated.Valid(infos) => infos.distinct.asRight case Validated.Invalid(errors) => diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala index 7f907903f..e426566d2 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/ShreddedType.scala @@ -18,16 +18,12 @@ import cats.free.Free import cats.implicits._ import com.snowplowanalytics.iglu.client.SchemaCriterion - -import com.snowplowanalytics.iglu.core.SchemaKey +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.rdbloader.LoaderError._ import com.snowplowanalytics.snowplow.rdbloader.config.Semver import com.snowplowanalytics.snowplow.rdbloader.utils.Common.toSnakeCase -// This project - - /** * Container for S3 folder with shredded JSONs ready to load * Usually it represents self-describing event or custom/derived context @@ -214,11 +210,11 @@ object ShreddedType { val (bucket, path) = S3.splitS3Key(key) val (subpath, shredpath) = splitFilpath(path, shredJob) extractSchemaKey(shredpath, shredJob) match { - case Some(schemaKey) => + case Some(SchemaKey(vendor, name, _, SchemaVer.Full(model, _, _))) => val prefix = S3.Folder.coerce("s3://" + bucket + "/" + subpath) - val result = Info(prefix, schemaKey.vendor, schemaKey.name, schemaKey.version.model, shredJob) + val result = Info(prefix, vendor, name, model, shredJob) result.asRight - case None => + case _ => ShreddedTypeKeyFailure(key).asLeft } } @@ -233,14 +229,17 @@ object ShreddedType { * @param shredJob shred job version to decide what format should be present * @return valid schema key if found */ - def extractSchemaKey(subpath: String, shredJob: Semver): Option[SchemaKey] = - if (shredJob <= ShredJobBeforeSparkVersion) SchemaKey.fromPath(subpath) - else subpath match { + def extractSchemaKey(subpath: String, shredJob: Semver): Option[SchemaKey] = { + if (shredJob <= ShredJobBeforeSparkVersion) { + val uri = "iglu:" + subpath + SchemaKey.fromUri(uri) + } else subpath match { case ShreddedSubpathPattern(vendor, name, format, version) => - val igluPath = s"$vendor/$name/$format/$version" - SchemaKey.fromPath(igluPath) + val uri = s"iglu:$vendor/$name/$format/$version" + SchemaKey.fromUri(uri) case _ => None } + } /** * Split S3 filepath (without bucket name) into subpath and shreddedpath diff --git a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/TrackerInterpreter.scala b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/TrackerInterpreter.scala index f63d5b183..c078f09a2 100644 --- a/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/TrackerInterpreter.scala +++ b/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/interpreters/implementations/TrackerInterpreter.scala @@ -28,6 +28,7 @@ import org.joda.time.DateTime import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.snowplow.scalatracker._ +import com.snowplowanalytics.snowplow.scalatracker.emitters.TEmitter._ import com.snowplowanalytics.snowplow.scalatracker.emitters.{AsyncBatchEmitter, AsyncEmitter} // This project @@ -71,11 +72,11 @@ object TrackerInterpreter { case Some(Collector((host, port))) => val emitter = monitoring.snowplow.flatMap(_.method) match { case Some(GetMethod) => - AsyncEmitter.createAndStart(host, port = port) + AsyncEmitter.createAndStart(host, port = Some(port), callback = Some(callback)) case Some(PostMethod) => - AsyncBatchEmitter.createAndStart(host, port = port, bufferSize = 2) + AsyncBatchEmitter.createAndStart(host, port = Some(port), bufferSize = 2) case None => - AsyncEmitter.createAndStart(host, port = port) + AsyncEmitter.createAndStart(host, port = Some(port), callback = Some(callback)) } val tracker = new Tracker(List(emitter), "snowplow-rdb-loader", monitoring.snowplow.flatMap(_.appId).getOrElse("rdb-loader")) Some(tracker) @@ -103,7 +104,7 @@ object TrackerInterpreter { */ def trackSuccess(tracker: Option[Tracker]): Unit = tracker match { case Some(t) => - t.trackUnstructEvent(SelfDescribingJson(LoadSucceededSchema, JObject(Nil))) + t.trackSelfDescribingEvent(SelfDescribingData(LoadSucceededSchema, JObject(Nil))) case None => () }