Skip to content

Commit

Permalink
RDB Loader: add discovery through SQS stream (close #234)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Dec 7, 2020
1 parent c4762e2 commit a03c098
Show file tree
Hide file tree
Showing 40 changed files with 697 additions and 605 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ lazy val common = project.in(file("modules/common"))
.settings(resolvers ++= Dependencies.resolutionRepos)
.settings(
libraryDependencies ++= Seq(
Dependencies.fs2Aws,
Dependencies.slf4j,
Dependencies.analyticsSdk,
Dependencies.badrows,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.utils
package com.snowplowanalytics.snowplow.rdbloader.common

import cats.data._

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.common

import java.time.Instant

import cats.implicits._

import io.circe.{Encoder, DecodingFailure, Decoder, Json}
import io.circe.generic.semiauto._
import io.circe.parser.parse
import io.circe.syntax._

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.implicits._

/** Common type of message RDB Loader can receive from Shredder or other apps */
sealed trait LoaderMessage {
import LoaderMessage._

/** Convert to self-describing JSON */
def selfDescribingData: SelfDescribingData[Json] =
this match {
case _: LoaderMessage.ShreddingComplete =>
SelfDescribingData(LoaderMessage.ShreddingCompleteKey, (this: LoaderMessage).asJson)
}
}

object LoaderMessage {

val ShreddingCompleteKey: SchemaKey =
SchemaKey("com.snowplowanalytics.snowplow.storage.rdbloader", "shredding_complete", "jsonschema", SchemaVer.Full(1,0,0))

/** Data format for shredded data */
sealed trait Format extends Product with Serializable
object Format {
final case object TSV extends Format
final case object JSON extends Format
// Another options can be Parquet and InAtomic for Snowflake-like structure

implicit val loaderMessageFormatEncoder: Encoder[Format] =
Encoder.instance(_.toString.asJson)
implicit val loaderMessageFormatDecoder: Decoder[Format] =
Decoder.instance { c => c.as[String] match {
case Right("TSV") => Format.TSV.asRight
case Right("JSON") => Format.JSON.asRight
case Right(other) => DecodingFailure(s"$other is unexpected format", c.history).asLeft
case Left(error) => error.asLeft
} }
}

/**
* Set of timestamps coming from shredder
* @param jobStarted time shred job has started processing the batch
* @param jobCompleted time shred job has finished processing the batch
* @param min earliest collector timestamp in the batch
* @param max latest collector timestamp in the batch
*/
final case class Timestamps(jobStarted: Instant,
jobCompleted: Instant,
min: Option[Instant],
max: Option[Instant])

final case class ShreddedType(schemaKey: SchemaKey, format: Format)

final case class Processor(artifact: String, version: Semver)

/**
* Message signalling that shredder has finished and data ready to be loaded
* @param base root of the shredded data
* @param types all shredded types found in the batch
* @param timestamps set of auxiliary timestamps known to shredder
* @param processor shredder application metadata
*/
final case class ShreddingComplete(base: S3.Folder,
types: List[ShreddedType],
timestamps: Timestamps,
processor: Processor) extends LoaderMessage

/** Parse raw string into self-describing JSON with [[LoaderMessage]] */
def fromString(s: String): Either[String, LoaderMessage] =
parse(s)
.leftMap(_.show)
.flatMap(json => SelfDescribingData.parse(json).leftMap(e => s"JSON message [${json.noSpaces}] is not self-describing, ${e.code}"))
.flatMap {
case SelfDescribingData(SchemaKey("com.snowplowanalytics.snowplow.storage.rdbloader", "shredding_complete", _, SchemaVer.Full(1, _, _)), data) =>
data.as[ShreddingComplete].leftMap(e => s"Cannot decode valid ShreddingComplete payload from [${data.noSpaces}], ${e.show}")
}

implicit val loaderMessageTimestampsEncoder: Encoder[Timestamps] =
deriveEncoder[Timestamps]
implicit val loaderMessageTimestampsDecoder: Decoder[Timestamps] =
deriveDecoder[Timestamps]
implicit val loaderMessageShreddedTypeEncoder: Encoder[ShreddedType] =
deriveEncoder[ShreddedType]
implicit val loaderMessageShreddedTypeDecoder: Decoder[ShreddedType] =
deriveDecoder[ShreddedType]
implicit val loaderMessageProcessorEncoder: Encoder[Processor] =
deriveEncoder[Processor]
implicit val loaderMessageProcessorDecoder: Decoder[Processor] =
deriveDecoder[Processor]
implicit val loaderMessageShreddingCompleteEncoder: Encoder[LoaderMessage] =
deriveEncoder[ShreddingComplete].contramap { case e: ShreddingComplete => e }
implicit val loaderMessageShreddingCompleteDecoder: Decoder[ShreddingComplete] =
deriveDecoder[ShreddingComplete]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.common

import javax.jms.{TextMessage, Message => JMessage}
import cats.syntax.either._

import cats.effect.Sync

case class Message[F[_], A](data: A, ack: F[Unit])

object Message {
implicit def messageDecoder[F[_]: Sync](message: JMessage): Either[Throwable, Message[F, String]] = {
val text = message.asInstanceOf[TextMessage].getText
val ack = Sync[F].delay(message.acknowledge())
Message(text, ack).asRight
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.utils
package com.snowplowanalytics.snowplow.rdbloader.common

import cats.syntax.either._
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.snowplowanalytics.snowplow.rdbloader.loaders
import io.circe.Decoder

import io.circe.{Decoder, Encoder, Json}

import shapeless.tag
import shapeless.tag._

Expand Down Expand Up @@ -52,11 +52,6 @@ object S3 {
coerce(string)
}

private def correctlyPrefixed(s: String): Boolean =
supportedPrefixes.foldLeft(false) { (result, prefix) =>
result || s.startsWith(s"$prefix://")
}

private def appendTrailingSlash(s: String): String =
if (s.endsWith("/")) s
else s + "/"
Expand Down Expand Up @@ -86,7 +81,7 @@ object S3 {
*/
def getAtomicPath(s: Key): Option[Folder] =
s match {
case loaders.Common.atomicSubpathPattern(prefix, subpath, _) =>
case AtomicSubpathPattern(prefix, subpath, _) =>
Some(Folder.coerce(prefix + "/" + subpath))
case _ => None
}
Expand Down Expand Up @@ -118,21 +113,15 @@ object S3 {
}
}

/**
* Transform S3 object summary into valid S3 key string
*/
def getKey(s3ObjectSummary: S3ObjectSummary): S3.BlobObject = {
val key = S3.Key.coerce(s"s3://${s3ObjectSummary.getBucketName}/${s3ObjectSummary.getKey}")
S3.BlobObject(key, s3ObjectSummary.getSize)
}

// Tags for refined types
sealed trait S3FolderTag
sealed trait S3KeyTag
sealed trait AtomicEventsKeyTag

implicit val s3FolderDecoder: Decoder[Folder] =
Decoder.decodeString.emap(Folder.parse)
implicit val s3FolderEncoder: Encoder[Folder] =
Encoder.instance(Json.fromString)

/**
* Split S3 path into bucket name and folder path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,21 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader
package config
package com.snowplowanalytics.snowplow.rdbloader.common

import cats.implicits._
import cats.{PartialOrder, Show}

import io.circe.Decoder
import io.circe.{ Decoder, Encoder }
import io.circe.syntax._

// This project
import utils.Common.IntString
import com.snowplowanalytics.snowplow.rdbloader.common.Common.IntString

/**
* Snowplow-specific semantic versioning for apps, libs and jobs,
* with defined decoders and ordering to compare different versions
*/
case class Semver(major: Int, minor: Int, patch: Int, prerelease: Option[Semver.Prerelease])
final case class Semver(major: Int, minor: Int, patch: Int, prerelease: Option[Semver.Prerelease])

/**
* Helpers for Snowplow-specific semantic versioning
Expand All @@ -38,9 +37,11 @@ object Semver {
* unknown as last resort - will match any string
*/
sealed trait Prerelease { def full: String }
case class Milestone(version: Int) extends Prerelease { def full = s"-M$version"}
case class ReleaseCandidate(version: Int) extends Prerelease { def full = s"-rc$version"}
case class Unknown(full: String) extends Prerelease
object Prerelease {
final case class Milestone(version: Int) extends Prerelease { def full = s"-M$version"}
final case class ReleaseCandidate(version: Int) extends Prerelease { def full = s"-rc$version"}
final case class Unknown(full: String) extends Prerelease
}


val semverPattern = """^(\d+)\.(\d+)\.(\d+)(.*)$""".r
Expand All @@ -63,9 +64,9 @@ object Semver {
case (Some(_), None) => -1
case (None, Some(_)) => 1
case (None, None) => 0
case (Some(Milestone(xm)), Some(Milestone(ym))) =>
case (Some(Prerelease.Milestone(xm)), Some(Prerelease.Milestone(ym))) =>
xm.partialCompare(ym)
case (Some(ReleaseCandidate(xrc)), Some(ReleaseCandidate(yrc))) =>
case (Some(Prerelease.ReleaseCandidate(xrc)), Some(Prerelease.ReleaseCandidate(yrc))) =>
xrc.partialCompare(yrc)
case _ => Double.NaN
}
Expand All @@ -90,9 +91,9 @@ object Semver {
* Any string can be decoded as last-resort `Unknown`
*/
def decodePrerelease(s: String): Prerelease = s match {
case milestonePattern(IntString(m)) => Milestone(m)
case rcPattern(IntString(rc)) => ReleaseCandidate(rc)
case _ => Unknown(s)
case milestonePattern(IntString(m)) => Prerelease.Milestone(m)
case rcPattern(IntString(rc)) => Prerelease.ReleaseCandidate(rc)
case _ => Prerelease.Unknown(s)
}

/**
Expand All @@ -108,12 +109,6 @@ object Semver {
Left(s"Version [$s] doesn't match Semantic Version pattern")
}

/**
* Circe decoder for semantic version
*/
implicit val semverDecoder =
Decoder.decodeString.emap(decodeSemver)

private implicit val prereleaseShow = new Show[Option[Prerelease]] {
def show(prerelease: Option[Prerelease]) = prerelease match {
case Some(p) => p.full
Expand All @@ -125,5 +120,11 @@ object Semver {
def show(version: Semver) =
s"${version.major}.${version.minor}.${version.patch}${version.prerelease.show}"
}

/** Circe codecs for semantic version */
implicit val semverDecoder: Decoder[Semver] =
Decoder.decodeString.emap(decodeSemver)
implicit val semverEncoder: Encoder[Semver] =
Encoder.instance(ver => ver.show.asJson)
}

Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ sealed trait StorageTarget extends Product with Serializable {
def sshTunnel: Option[StorageTarget.TunnelConfig]

def blacklistTabular: Option[List[SchemaCriterion]] // None means tabular is disabled
def messageQueue: Option[String]
}

object StorageTarget {
Expand All @@ -74,6 +75,7 @@ object StorageTarget {
*/
case class RedshiftConfig(id: UUID,
name: String,

host: String,
database: String,
port: Int,
Expand All @@ -85,7 +87,9 @@ object StorageTarget {
maxError: Int,
compRows: Long,
sshTunnel: Option[TunnelConfig],
blacklistTabular: Option[List[SchemaCriterion]])

blacklistTabular: Option[List[SchemaCriterion]],
messageQueue: Option[String])
extends StorageTarget

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import com.snowplowanalytics.snowplow.scalatracker.UUIDProvider

package object common {

/** * Subpath to check `atomic-events` directory presence */
val AtomicSubpathPattern = "(.*)/(run=[0-9]{4}-[0-1][0-9]-[0-3][0-9]-[0-2][0-9]-[0-6][0-9]-[0-6][0-9]/atomic-events)/(.*)".r
// year month day hour minute second

implicit val catsClockIdInstance: Clock[Id] = new Clock[Id] {
override def realTime(unit: TimeUnit): Id[Long] =
unit.convert(System.currentTimeMillis(), MILLISECONDS)
Expand Down
Loading

0 comments on commit a03c098

Please sign in to comment.