Skip to content

Commit

Permalink
Common: remove Processing Manifest (close #186)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Oct 23, 2020
1 parent 5fea0a1 commit b5d541b
Show file tree
Hide file tree
Showing 22 changed files with 28 additions and 1,079 deletions.
2 changes: 0 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ lazy val loader = project.in(file("."))
Dependencies.circeGeneric,
Dependencies.circeGenericExtra,
Dependencies.circeLiteral,
Dependencies.manifest,
Dependencies.fs2,
Dependencies.schemaDdl,

Expand Down Expand Up @@ -96,7 +95,6 @@ lazy val shredder = project.in(file("shredder"))
Dependencies.sparkCore,
Dependencies.sparkSQL,
Dependencies.igluCoreCirce,
Dependencies.manifest,
// Scala (test only)
Dependencies.circeOptics,
Dependencies.specs2,
Expand Down
1 change: 0 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ object Dependencies {
val igluClient = "com.snowplowanalytics" %% "iglu-scala-client" % V.igluClient
val scalaTracker = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.scalaTracker
val scalaTrackerEmit = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-id" % V.scalaTracker
val manifest = "com.snowplowanalytics" %% "snowplow-processing-manifest" % V.manifest
val badrows = "com.snowplowanalytics" %% "snowplow-badrows" % V.badrows
val igluCoreCirce = "com.snowplowanalytics" %% "iglu-core-circe" % V.igluCore
val cats = "org.typelevel" %% "cats" % V.cats
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import io.circe.literal._
import java.util.UUID
import java.time.Instant

import scala.util.Try
import scala.util.control.NonFatal

// Spark
Expand All @@ -40,16 +39,10 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.storage.StorageLevel

// AWS SDK
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException

// Manifest
import com.snowplowanalytics.manifest.core.ManifestError
import com.snowplowanalytics.manifest.core.ManifestError._
import com.snowplowanalytics.manifest.core.ProcessingManifest._

import com.snowplowanalytics.snowplow.analytics.scalasdk.{ Event, ParsingError }
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.Contexts
import com.snowplowanalytics.snowplow.badrows.{ BadRow, Processor, Payload, Failure, FailureDetails }
Expand All @@ -59,7 +52,6 @@ import com.snowplowanalytics.snowplow.eventsmanifest.{ EventsManifest, EventsMan
import com.snowplowanalytics.iglu.core.SchemaVer
import com.snowplowanalytics.iglu.core.{ SchemaKey, SelfDescribingData }
import com.snowplowanalytics.iglu.client.{ Client, ClientError }
import DynamodbManifest.ShredderManifest
import rdbloader.common._
import rdbloader.generated.ProjectMetadata

Expand Down Expand Up @@ -166,13 +158,6 @@ object ShredJob extends SparkJob {

val job = new ShredJob(spark, shredConfig)

// Processing manifest, existing only on a driver. Iglu Resolver without cache
val manifest = shredConfig.getManifestData.map {
case (m, i) =>
val resolver = singleton.IgluSingleton.get(shredConfig.igluConfig)
ShredderManifest(DynamodbManifest.initialize(m, resolver.cacheless), i)
}

val atomicLengths = singleton.IgluSingleton.get(shredConfig.igluConfig).resolver.lookupSchema(AtomicSchema) match { // TODO: retry
case Right(schema) =>
EventUtils.getAtomicLengths(schema).fold(e => throw new RuntimeException(e), identity)
Expand All @@ -188,34 +173,7 @@ object ShredJob extends SparkJob {
config
}

runJob(manifest, eventsManifest, atomicLengths, job, true).get
}

/** Start a job, if necessary recording process to manifest */
def runJob(manifest: Option[ShredderManifest],
eventsManifest: Option[EventsManifestConfig],
lengths: Map[String, Int],
job: ShredJob,
jsonOnly: Boolean): Try[Unit] = {
manifest match {
case None => // Manifest is not enabled, simply run a job
Try(job.run(lengths, eventsManifest, jsonOnly)).map(_ => None)
case Some(ShredderManifest(manifest, itemId)) => // Manifest is enabled.
// Envelope job into function to pass to `Manifest.processItem` later
val process: ProcessNew = () => Try {
job.run(lengths, eventsManifest, jsonOnly)
val shreddedTypes = job.shreddedTypes.value.toSet
DynamodbManifest.processedPayload(shreddedTypes)
}

// Execute job in manifest transaction
val id = DynamodbManifest.normalizeItemId(itemId)
manifest.processNewItem(id, DynamodbManifest.ShredJobApplication, None, process) match {
case Right(_) => util.Success(())
case Left(ManifestError.ApplicationError(t, _, _)) => util.Failure(t) // Usual Spark exception
case Left(error) => util.Failure(FatalEtlError(error.show)) // Manifest-related exception
}
}
job.run(atomicLengths, eventsManifest, true)
}

/**
Expand Down Expand Up @@ -244,7 +202,6 @@ object ShredJob extends SparkJob {
BadRow.LoaderIgluError(processor, failure, payload)
}


/**
* The path at which to store the altered enriched events.
* @param outFolder shredded/good/run=xxx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,10 @@ package com.snowplowanalytics.snowplow.storage
import java.time.{Instant, ZoneOffset}
import java.time.format.DateTimeFormatter

import cats.Id

import io.circe.Json

import com.snowplowanalytics.iglu.client.{Client, Resolver}

import com.snowplowanalytics.snowplow.storage.spark.DynamodbManifest.ManifestFailure

package object spark {

private val Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")

implicit class ResolverOps(resolver: Resolver[Id]) {
def cacheless: Resolver[ManifestFailure] =
Resolver(resolver.repos, None)
}

implicit class ClientOps(client: Client[Id, Json]) {
def cacheless: Client[ManifestFailure, Json] =
Client(client.resolver.cacheless, client.validator)
}

implicit class InstantOps(time: Instant) {
def formatted: String = {
time.atOffset(ZoneOffset.UTC).format(Formatter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import cats.free.Free
import cats.data.EitherT
import cats.implicits._

import com.snowplowanalytics.manifest.core.{ Item, Application }

import com.snowplowanalytics.iglu.schemaddl.migrations.SchemaList

// This library
Expand All @@ -40,10 +38,6 @@ object LoaderA {
case class KeyExists(key: S3.Key) extends LoaderA[Boolean]
case class DownloadData(path: S3.Folder, dest: Path) extends LoaderA[Either[LoaderError, List[Path]]]

// Processing Manifest ops
case class ManifestDiscover(loader: Application, shredder: Application, predicate: Option[Item => Boolean]) extends LoaderA[Either[LoaderError, List[Item]]]
case class ManifestProcess(item: Item, load: LoaderAction[Unit]) extends LoaderA[Either[LoaderError, Unit]]

// Loading ops
case class ExecuteUpdate(sql: SqlString) extends LoaderA[Either[LoaderError, Long]]
case class CopyViaStdin(files: List[Path], sql: SqlString) extends LoaderA[Either[LoaderError, Long]]
Expand Down Expand Up @@ -88,14 +82,6 @@ object LoaderA {
def downloadData(source: S3.Folder, dest: Path): LoaderAction[List[Path]] =
EitherT(Free.liftF[LoaderA, Either[LoaderError, List[Path]]](DownloadData(source, dest)))

/** Discover data from manifest */
def manifestDiscover(loader: Application, shredder: Application, predicate: Option[Item => Boolean]): Action[Either[LoaderError, List[Item]]] =
Free.liftF[LoaderA, Either[LoaderError, List[Item]]](ManifestDiscover(loader, shredder, predicate))

/** Add Processing manifest records due loading */
def manifestProcess(item: Item, load: LoaderAction[Unit]): LoaderAction[Unit] =
EitherT(Free.liftF[LoaderA, Either[LoaderError, Unit]](ManifestProcess(item, load)))

/** Execute single SQL statement (against target in interpreter) */
def executeUpdate(sql: SqlString): LoaderAction[Long] =
EitherT(Free.liftF[LoaderA, Either[LoaderError, Long]](ExecuteUpdate(sql)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import cats.Show
import cats.implicits._
import cats.data.ValidatedNel

import com.snowplowanalytics.manifest.core.ManifestError

import com.snowplowanalytics.snowplow.rdbloader.discovery.DiscoveryFailure

/** Root error type */
Expand Down Expand Up @@ -64,14 +62,6 @@ object LoaderError {
def flattenValidated[A](validated: ValidatedNel[DiscoveryFailure, A]): Either[LoaderError, A] =
validated.leftMap(errors => DiscoveryError(errors.toList): LoaderError).toEither

def fromManifestError(manifestError: ManifestError): LoaderError =
DiscoveryFailure.ManifestFailure(manifestError).toLoaderError

/** Exception wrapper to pass to processing manifest */
case class LoaderThrowable(origin: LoaderError) extends Throwable {
override def getMessage: String = origin.show
}

/** Other errors */
case class LoaderLocalError(message: String) extends LoaderError

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import cats.data._
import cats.free.Free
import cats.implicits._

import com.snowplowanalytics.manifest.core.Item

import com.snowplowanalytics.snowplow.rdbloader.config.Semver
import com.snowplowanalytics.snowplow.rdbloader.LoaderError._

Expand All @@ -32,15 +30,13 @@ import com.snowplowanalytics.snowplow.rdbloader.LoaderError._
* @param shreddedTypes list of shredded types in this directory
* @param specificFolder if specific target loader was provided by `--folder`
* (remains default `false` until `setSpecificFolder`)
* @param item Processing Manifest records if it was discovered through manifest
*/
case class DataDiscovery(
base: S3.Folder,
atomicCardinality: Option[Long],
atomicSize: Option[Long],
shreddedTypes: List[ShreddedType],
specificFolder: Boolean,
item: Option[Item]) {
specificFolder: Boolean) {
/** ETL id */
def runId: String = base.split("/").last

Expand Down Expand Up @@ -97,7 +93,6 @@ object DataDiscovery {
sealed trait DiscoveryTarget extends Product with Serializable
case class Global(folder: S3.Folder) extends DiscoveryTarget
case class InSpecificFolder(folder: S3.Folder) extends DiscoveryTarget
case class ViaManifest(folder: Option[S3.Folder]) extends DiscoveryTarget

/**
* Discover list of shred run folders, each containing
Expand Down Expand Up @@ -145,10 +140,6 @@ object DataDiscovery {
LoaderAction.liftA(LoaderA.print("More than one folder discovered with `--folder` option"))
} else LoaderAction.unit
} yield discoveries
case ViaManifest(None) =>
ManifestDiscovery.discover(id, region, assets)
case ViaManifest(Some(folder)) =>
ManifestDiscovery.discoverFolder(folder, id, region, assets).map(_.pure[List])
}

setSpecificFolder(target, result)
Expand All @@ -160,8 +151,6 @@ object DataDiscovery {
F.map(discovery) { d =>
target match {
case InSpecificFolder(_) => d.copy(specificFolder = true)
case ViaManifest(Some(_)) => d.copy(specificFolder = true)
case ViaManifest(None) => d.copy(specificFolder = false)
case Global(_) => d.copy(specificFolder = false)
}
}
Expand Down Expand Up @@ -216,7 +205,7 @@ object DataDiscovery {
if (atomicKeys.nonEmpty) {
val shreddedData = shreddedKeys.map(_.info).distinct
val size = Some(atomicKeys.foldMap(_.size))
DataDiscovery(base, Some(atomicKeys.length), size, shreddedData, false, None).validNel
DataDiscovery(base, Some(atomicKeys.length), size, shreddedData, false).validNel
} else {
DiscoveryFailure.AtomicDiscoveryFailure(base).invalidNel
}
Expand All @@ -242,7 +231,7 @@ object DataDiscovery {
* @param dataKey either successful or failed data key
* @param region AWS region for S3 buckets
* @param assets optional JSONPath assets S3 bucket
* @return `Action` conaining `Validation` - as on next step we can aggregate errors
* @return `Action` containing `Validation` - as on next step we can aggregate errors
*/
private def transformDataKey(
dataKey: DiscoveryStep[DataKeyIntermediate],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ package com.snowplowanalytics.snowplow.rdbloader.discovery
import cats.data.NonEmptyList
import cats.syntax.show._

import com.snowplowanalytics.manifest.core.ManifestError

import com.snowplowanalytics.snowplow.rdbloader.{S3, LoaderError}

/**
Expand Down Expand Up @@ -82,11 +80,6 @@ object DiscoveryFailure {
def getMessage: String =
s"No data discovered in [$path], while RDB Loader was explicitly pointed to it by '--folder' option. " +
s"Possible reasons: S3 eventual consistency or folder does not contain any files"

// Message for enabled manifest
def getManifestMessage: String =
s"Processing manifest does not have unprocessed item [$path]. It can be there, but " +
"already loaded by RDB Loader or unprocessed by RDB Shredder"
}

/**
Expand All @@ -97,11 +90,6 @@ object DiscoveryFailure {
s"Cannot extract contexts or self-describing events from directory [$path].\nInvalid key example: $example. Total $invalidKeyCount invalid keys.\nCorrupted shredded/good state or unexpected Snowplow Shred job version"
}

case class ManifestFailure(manifestError: ManifestError) extends DiscoveryFailure {
def getMessage: String = manifestError.show
override def toString: String = getMessage
}

/** Aggregate some failures into more compact error-list to not pollute end-error */
def aggregateDiscoveryFailures(failures: NonEmptyList[DiscoveryFailure]): List[DiscoveryFailure] = {
val (shreddedTypeFailures, otherFailures) = failures.toList.span(_.isInstanceOf[DiscoveryFailure.ShreddedTypeKeyFailure])
Expand Down
Loading

0 comments on commit b5d541b

Please sign in to comment.