Skip to content

Commit

Permalink
Amendment to #763
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Apr 8, 2023
1 parent 100646a commit d372ccb
Show file tree
Hide file tree
Showing 15 changed files with 138 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import fs2.io.file.{exists, move, readAll, tempFileResource, writeAll}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, HttpClient}
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, HttpClient, ShiftExecution}

import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients

Expand Down Expand Up @@ -134,6 +134,7 @@ object Assets {
/** Initializes the [[updateStream]] if refresh period is specified. */
def run[F[_]: ConcurrentEffect: ContextShift: HttpClient: Parallel: Timer, A](
blocker: Blocker,
shifter: ShiftExecution[F],
sem: Semaphore[F],
updatePeriod: Option[FiniteDuration],
assetsState: Assets.State[F],
Expand All @@ -144,7 +145,7 @@ object Assets {
val init = for {
_ <- Logger[F].info(show"Assets will be checked every $interval")
assets <- enrichments.get.map(_.configs.flatMap(_.filesToCache))
} yield updateStream[F](blocker, sem, assetsState, enrichments, interval, assets)
} yield updateStream[F](blocker, shifter, sem, assetsState, enrichments, interval, assets)
Stream.eval(init).flatten
case None =>
Stream.empty.covary[F]
Expand All @@ -156,6 +157,7 @@ object Assets {
*/
def updateStream[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer: HttpClient](
blocker: Blocker,
shifter: ShiftExecution[F],
sem: Semaphore[F],
state: State[F],
enrichments: Ref[F, Environment.Enrichments[F]],
Expand All @@ -174,7 +176,7 @@ object Assets {
Logger[F].info("All the assets are still the same, no update")
else
sem.withPermit {
update(blocker, state, enrichments, newAssets)
update(blocker, shifter, state, enrichments, newAssets)
}
}
} yield ()
Expand Down Expand Up @@ -223,6 +225,7 @@ object Assets {
*/
def update[F[_]: ConcurrentEffect: ContextShift: HttpClient](
blocker: Blocker,
shifter: ShiftExecution[F],
state: State[F],
enrichments: Ref[F, Environment.Enrichments[F]],
newAssets: List[Downloaded]
Expand All @@ -240,7 +243,7 @@ object Assets {

_ <- Logger[F].info("Reinitializing enrichments")
old <- enrichments.get
fresh <- old.reinitialize(BlockerF.ofBlocker(blocker))
fresh <- old.reinitialize(BlockerF.ofBlocker(blocker), shifter)
_ <- enrichments.set(fresh)
} yield ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.RemoteAdap
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, HttpClient}
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, HttpClient, ShiftExecution}

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{ConfigFile, ParsedConfigs}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{
Expand Down Expand Up @@ -71,6 +71,7 @@ import java.util.concurrent.TimeoutException
* have to be replaced with newer ones
* @param httpClient client used to perform HTTP requests
* @param blocker thread pool for blocking operations and enrichments themselves
* @param shifter thread pool for blocking jdbc operations in the SqlEnrichment
* @param source stream of records containing the collector payloads
* @param sinkGood function that sinks enriched event
* @param sinkPii function that sinks pii event
Expand Down Expand Up @@ -103,6 +104,7 @@ final case class Environment[F[_], A](
httpClient: Http4sClient[F],
remoteAdapterHttpClient: Option[Http4sClient[F]],
blocker: Blocker,
shifter: ShiftExecution[F],
source: Stream[F, A],
adapterRegistry: AdapterRegistry,
sinkGood: AttributedByteSink[F],
Expand Down Expand Up @@ -135,21 +137,29 @@ object Environment {
final case class Enrichments[F[_]: Clock](registry: EnrichmentRegistry[F], configs: List[EnrichmentConf]) {

/** Initialize same enrichments, specified by configs (in case DB files updated) */
def reinitialize(blocker: BlockerF[F])(implicit A: Async[F], C: HttpClient[F]): F[Enrichments[F]] =
Enrichments.buildRegistry(configs, blocker).map(registry => Enrichments(registry, configs))
def reinitialize(blocker: BlockerF[F], shifter: ShiftExecution[F])(implicit A: Async[F], C: HttpClient[F]): F[Enrichments[F]] =
Enrichments.buildRegistry(configs, blocker, shifter).map(registry => Enrichments(registry, configs))
}

object Enrichments {
def make[F[_]: Async: Clock: HttpClient](configs: List[EnrichmentConf], blocker: BlockerF[F]): Resource[F, Ref[F, Enrichments[F]]] =
def make[F[_]: Async: Clock: HttpClient](
configs: List[EnrichmentConf],
blocker: BlockerF[F],
shifter: ShiftExecution[F]
): Resource[F, Ref[F, Enrichments[F]]] =
Resource.eval {
for {
registry <- buildRegistry[F](configs, blocker)
registry <- buildRegistry[F](configs, blocker, shifter)
ref <- Ref.of(Enrichments[F](registry, configs))
} yield ref
}

def buildRegistry[F[_]: Async: HttpClient: Clock](configs: List[EnrichmentConf], blocker: BlockerF[F]) =
EnrichmentRegistry.build[F](configs, blocker).value.flatMap {
def buildRegistry[F[_]: Async: HttpClient: Clock](
configs: List[EnrichmentConf],
blocker: BlockerF[F],
shifter: ShiftExecution[F]
) =
EnrichmentRegistry.build[F](configs, blocker, shifter).value.flatMap {
case Right(reg) => Async[F].pure(reg)
case Left(error) => Async[F].raiseError[EnrichmentRegistry[F]](new RuntimeException(error))
}
Expand Down Expand Up @@ -191,9 +201,10 @@ object Environment {
adapterRegistry = new AdapterRegistry(remoteAdapters)
sem <- Resource.eval(Semaphore(1L))
assetsState <- Resource.eval(Assets.State.make[F](blocker, sem, clts, assets))
shifter <- ShiftExecution.ofSingleThread[F]
enrichments <- {
implicit val C: Http4sClient[F] = http
Enrichments.make[F](parsedConfigs.enrichmentConfigs, BlockerF.ofBlocker(blocker))
Enrichments.make[F](parsedConfigs.enrichmentConfigs, BlockerF.ofBlocker(blocker), shifter)
}
} yield Environment[F, A](
igluClient,
Expand All @@ -204,6 +215,7 @@ object Environment {
http,
remoteAdaptersHttpClient,
blocker,
shifter,
source,
adapterRegistry,
good,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ object Run {
}
val updates = {
implicit val httpClient: Http4sClient[F] = env.httpClient
Assets.run[F, A](env.blocker, env.semaphore, env.assetsUpdatePeriod, env.assetsState, env.enrichments)
Assets.run[F, A](env.blocker, env.shifter, env.semaphore, env.assetsUpdatePeriod, env.assetsState, env.enrichments)
}
val telemetry = Telemetry.run[F, A](env)
val reporting = env.metrics.report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import cats.effect.{Blocker, IO, Resource}
import cats.effect.concurrent.Semaphore

import cats.effect.testing.specs2.CatsIO
import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, ShiftExecution}

import com.snowplowanalytics.snowplow.enrich.common.fs2.test._
import org.http4s.client.{Client => Http4sClient}
Expand Down Expand Up @@ -123,15 +123,15 @@ class AssetsSpec extends Specification with CatsIO with ScalaCheck {
for {
blocker <- Blocker[IO]
sem <- Resource.eval(Semaphore[IO](1L))
enrichments <- Environment.Enrichments.make[IO](List(), BlockerF.noop)
enrichments <- Environment.Enrichments.make[IO](List(), BlockerF.noop, ShiftExecution.noop)
_ <- SpecHelpers.filesResource(blocker, TestFiles)
} yield (blocker, sem, enrichments)

val update = Stream
.resource(resources)
.flatMap {
case (blocker, sem, enrichments) =>
Assets.updateStream[IO](blocker, sem, state, enrichments, 1.second, List(uri -> filename))
Assets.updateStream[IO](blocker, ShiftExecution.noop, sem, state, enrichments, 1.second, List(uri -> filename))
}
.haltAfter(2.second)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import com.snowplowanalytics.iglu.client.resolver.registries.Registry
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, ShiftExecution}
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry

Expand Down Expand Up @@ -140,7 +140,7 @@ object BlackBoxTesting extends Specification with CatsIO {
case Invalid(e) => IO.raiseError(new IllegalArgumentException(s"can't parse enrichmentsJson: $e"))
case Valid(list) => IO.pure(list)
}
built <- EnrichmentRegistry.build[IO](confs, BlockerF.noop).value
built <- EnrichmentRegistry.build[IO](confs, BlockerF.noop, ShiftExecution.noop).value
registry <- built match {
case Left(e) => IO.raiseError(new IllegalArgumentException(s"can't build EnrichmentRegistry: $e"))
case Right(r) => IO.pure(r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import com.snowplowanalytics.snowplow.badrows.BadRow
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, ShiftExecution}

import com.snowplowanalytics.snowplow.enrich.common.fs2.{Assets, AttributedData, Enrich, EnrichSpec, Environment}
import com.snowplowanalytics.snowplow.enrich.common.fs2.Environment.{Enrichments, StreamsSettings}
Expand Down Expand Up @@ -80,6 +80,7 @@ case class TestEnvironment[A](
.run[IO, A](updatedEnv)
.merge(
Assets.run[IO, A](updatedEnv.blocker,
updatedEnv.shifter,
updatedEnv.semaphore,
updatedEnv.assetsUpdatePeriod,
updatedEnv.assetsState,
Expand Down Expand Up @@ -136,9 +137,10 @@ object TestEnvironment extends CatsIO {
clients = Clients.init[IO](http, Nil)
sem <- Resource.eval(Semaphore[IO](1L))
assetsState <- Resource.eval(Assets.State.make(blocker, sem, clients, enrichments.flatMap(_.filesToCache)))
shifter <- ShiftExecution.ofSingleThread
enrichmentsRef <- {
implicit val client: Http4sClient[IO] = http
Enrichments.make[IO](enrichments, BlockerF.ofBlocker(blocker))
Enrichments.make[IO](enrichments, BlockerF.ofBlocker(blocker), shifter)
}
goodRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty))
piiRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty))
Expand All @@ -153,6 +155,7 @@ object TestEnvironment extends CatsIO {
http,
Some(http),
blocker,
shifter,
source,
adapterRegistry,
g => goodRef.update(_ ++ g),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import com.snowplowanalytics.weather.providers.openweather.CreateOWM

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf._

import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, CirceUtils}
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, CirceUtils, ShiftExecution}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry._
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequest.ApiRequestEnrichment
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.pii.PiiPseudonymizerEnrichment
Expand Down Expand Up @@ -109,7 +109,8 @@ object EnrichmentRegistry {
F[_]: Monad: CreateForex: CreateIabClient: CreateIpLookups: CreateOWM: CreateParser: CreateUaParserEnrichment: sqlquery.CreateSqlQueryEnrichment: apirequest.CreateApiRequestEnrichment
](
confs: List[EnrichmentConf],
blocker: BlockerF[F]
blocker: BlockerF[F],
shifter: ShiftExecution[F]
): EitherT[F, String, EnrichmentRegistry[F]] =
confs.foldLeft(EitherT.pure[F, String](EnrichmentRegistry[F]())) { (er, e) =>
e match {
Expand All @@ -121,7 +122,7 @@ object EnrichmentRegistry {
case c: PiiPseudonymizerConf => er.map(_.copy(piiPseudonymizer = c.enrichment.some))
case c: SqlQueryConf =>
for {
enrichment <- EitherT.right(c.enrichment[F](blocker))
enrichment <- EitherT.right(c.enrichment[F](blocker, shifter))
registry <- er
} yield registry.copy(sqlQuery = enrichment.some)
case c: AnonIpConf => er.map(_.copy(anonIp = c.enrichment.some))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequ
HttpApi
}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery.{CreateSqlQueryEnrichment, Rdbms, SqlQueryEnrichment}
import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, ShiftExecution}

sealed trait EnrichmentConf {

Expand Down Expand Up @@ -78,8 +78,8 @@ object EnrichmentConf {
output: sqlquery.Output,
cache: SqlQueryEnrichment.Cache
) extends EnrichmentConf {
def enrichment[F[_]: Monad: CreateSqlQueryEnrichment](blocker: BlockerF[F]): F[SqlQueryEnrichment[F]] =
SqlQueryEnrichment[F](this, blocker)
def enrichment[F[_]: Monad: CreateSqlQueryEnrichment](blocker: BlockerF[F], shifter: ShiftExecution[F]): F[SqlQueryEnrichment[F]] =
SqlQueryEnrichment[F](this, blocker, shifter)
}

final case class AnonIpConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.CachingEvaluator
import com.zaxxer.hikari.HikariDataSource
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.SqlQueryConf
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, ResourceF}
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, ResourceF, ShiftExecution}
import io.circe.Json

import scala.collection.immutable.IntMap

/** Initialize resources, necessary for SQL Query enrichment: cache and connection */
sealed trait CreateSqlQueryEnrichment[F[_]] {
def create(conf: SqlQueryConf, blocker: BlockerF[F]): F[SqlQueryEnrichment[F]]
def create(
conf: SqlQueryConf,
blocker: BlockerF[F],
shifter: ShiftExecution[F]
): F[SqlQueryEnrichment[F]]
}

object CreateSqlQueryEnrichment {
Expand All @@ -37,7 +41,11 @@ object CreateSqlQueryEnrichment {
implicit CLM: SqlCacheInit[F]
): CreateSqlQueryEnrichment[F] =
new CreateSqlQueryEnrichment[F] {
def create(conf: SqlQueryConf, blocker: BlockerF[F]): F[SqlQueryEnrichment[F]] = {
def create(
conf: SqlQueryConf,
blocker: BlockerF[F],
shifter: ShiftExecution[F]
): F[SqlQueryEnrichment[F]] = {
val cacheConfig = CachingEvaluator.Config(
size = conf.cache.size,
successTtl = conf.cache.ttl,
Expand All @@ -55,6 +63,7 @@ object CreateSqlQueryEnrichment {
conf.output,
evaluator,
blocker,
shifter,
getDataSource(conf.db)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.snowplowanalytics.snowplow.badrows.FailureDetails
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.SqlQueryConf
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, ParseableEnrichment}
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, CirceUtils, ResourceF}
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, CirceUtils, ResourceF, ShiftExecution}
import io.circe._
import io.circe.generic.semiauto._
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -77,8 +77,12 @@ object SqlQueryEnrichment extends ParseableEnrichment {
).mapN(SqlQueryConf(schemaKey, _, _, _, _, _)).toEither
}.toValidated

def apply[F[_]: CreateSqlQueryEnrichment](conf: SqlQueryConf, blocker: BlockerF[F]): F[SqlQueryEnrichment[F]] =
CreateSqlQueryEnrichment[F].create(conf, blocker)
def apply[F[_]: CreateSqlQueryEnrichment](
conf: SqlQueryConf,
blocker: BlockerF[F],
ec: ShiftExecution[F]
): F[SqlQueryEnrichment[F]] =
CreateSqlQueryEnrichment[F].create(conf, blocker, ec)

/** Just a string with SQL, not escaped */
final case class Query(sql: String) extends AnyVal
Expand Down Expand Up @@ -111,6 +115,7 @@ final case class SqlQueryEnrichment[F[_]: Monad: DbExecutor: ResourceF: Clock](
output: Output,
sqlQueryEvaluator: SqlQueryEvaluator[F],
blocker: BlockerF[F],
shifter: ShiftExecution[F],
dataSource: DataSource
) extends Enrichment {
private val enrichmentInfo =
Expand Down Expand Up @@ -189,7 +194,7 @@ final case class SqlQueryEnrichment[F[_]: Monad: DbExecutor: ResourceF: Clock](
EitherT {
sqlQueryEvaluator.evaluateForKey(
intMap,
getResult = () => blocker.blockOn(query(connection, intMap).value)
getResult = () => shifter.shift(query(connection, intMap).value)
)
}
.leftMap { t =>
Expand Down
Loading

0 comments on commit d372ccb

Please sign in to comment.