Skip to content

Commit

Permalink
common: improve caching in API/SQL enrichments (close #747)
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Feb 17, 2023
1 parent 34e5b79 commit bb71d94
Show file tree
Hide file tree
Showing 13 changed files with 369 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ object Environment {
Slf4jLogger.getLogger[F]

/** Registry with all allocated clients (MaxMind, IAB etc) and their original configs */
final case class Enrichments[F[_]](registry: EnrichmentRegistry[F], configs: List[EnrichmentConf]) {
final case class Enrichments[F[_]: Clock](registry: EnrichmentRegistry[F], configs: List[EnrichmentConf]) {

/** Initialize same enrichments, specified by configs (in case DB files updated) */
def reinitialize(blocker: BlockerF[F])(implicit A: Async[F], C: HttpClient[F]): F[Enrichments[F]] =
Expand All @@ -148,7 +148,7 @@ object Environment {
} yield ref
}

def buildRegistry[F[_]: Async: HttpClient](configs: List[EnrichmentConf], blocker: BlockerF[F]) =
def buildRegistry[F[_]: Async: HttpClient: Clock](configs: List[EnrichmentConf], blocker: BlockerF[F]) =
EnrichmentRegistry.build[F](configs, blocker).value.flatMap {
case Right(reg) => Async[F].pure(reg)
case Left(error) => Async[F].raiseError[EnrichmentRegistry[F]](new RuntimeException(error))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (c) 2023-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry

import cats.Monad
import cats.effect.Clock
import cats.implicits._
import com.snowplowanalytics.lrumap.{CreateLruMap, LruMap}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.CachingEvaluator.{Cache, CachedItem, GetResult, Value}

import java.util.concurrent.TimeUnit

final class CachingEvaluator[F[_], K, V](
cache: Cache[F, K, V],
config: CachingEvaluator.Config
) {

def evaluateForKey(key: K, getResult: GetResult[F, V])(implicit M: Monad[F], C: Clock[F]): F[Either[Throwable, V]] =
cache.get(key).flatMap {
case Some(cachedItem) =>
isExpired(cachedItem).flatMap {
case true =>
getResultAndCache(key, getResult, lastCachedValue = Some(cachedItem.value))
case false =>
Monad[F].pure(toEither(cachedItem.value))
}
case None =>
getResultAndCache(key, getResult, lastCachedValue = None)
}

private def getResultAndCache(
key: K,
getResult: GetResult[F, V],
lastCachedValue: Option[Value[V]]
)(
implicit M: Monad[F],
C: Clock[F]
): F[Either[Throwable, V]] =
getResult()
.map(freshResult => toCacheValue(lastCachedValue, freshResult))
.flatTap(freshResult => putToCache(key, freshResult))
.map(toEither)

private def toCacheValue(lastCachedValue: Option[Value[V]], freshResult: Either[Throwable, V]): Value[V] =
freshResult match {
case Right(value) =>
Value.Success(value)
case Left(freshError) =>
Value.Error(freshError, extractLastKnownSuccess(lastCachedValue))
}

private def extractLastKnownSuccess(lastCachedValue: Option[Value[V]]): Option[V] =
lastCachedValue match {
case Some(Value.Success(value)) => Some(value)
case Some(Value.Error(_, lastKnownSuccess)) => lastKnownSuccess
case None => None
}

private def toEither(value: Value[V]): Either[Throwable, V] =
value match {
case Value.Success(value) => Right(value)
case Value.Error(_, Some(lastSuccess)) => Right(lastSuccess)
case Value.Error(ex, None) => Left(ex)
}

private def putToCache(
key: K,
result: Value[V]
)(
implicit M: Monad[F],
C: Clock[F]
): F[Unit] =
for {
storedAt <- getCurrentTime
_ <- cache.put(key, CachedItem(result, storedAt))
} yield ()

private def isExpired(cachedItem: CachedItem[V])(implicit M: Monad[F], C: Clock[F]): F[Boolean] = {
val ttlToUse = resolveProperTtl(cachedItem.value)
getCurrentTime.map { currentTime =>
currentTime - cachedItem.storedAt > ttlToUse
}
}

private def resolveProperTtl(value: Value[V]): Int =
value match {
case _: Value.Success[V] => config.successTtl
case _: Value.Error[V] => config.errorTtl
}

private def getCurrentTime(implicit C: Clock[F]): F[Long] = C.realTime(TimeUnit.SECONDS)

}

object CachingEvaluator {

type Cache[F[_], K, V] = LruMap[F, K, CachedItem[V]]
type GetResult[F[_], V] = () => F[Either[Throwable, V]]

sealed trait Value[+V]
object Value {
final case class Success[V](value: V) extends Value[V]
final case class Error[V](value: Throwable, lastKnownSuccess: Option[V]) extends Value[V]
}

final case class CachedItem[V](value: Value[V], storedAt: Long)

final case class Config(
size: Int,
successTtl: Int,
errorTtl: Int
)

def create[F[_]: Monad, K, V](config: Config)(implicit CLM: CreateLruMap[F, K, CachedItem[V]]): F[CachingEvaluator[F, K, V]] =
CLM
.create(config.size)
.map(cache => new CachingEvaluator(cache, config))
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,28 @@
*/
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequest

import java.util.UUID

import cats.{Id, Monad}
import cats.Monad
import cats.data.{EitherT, NonEmptyList, ValidatedNel}
import cats.effect.Clock
import cats.implicits._

import cats.effect.Sync

import io.circe._
import io.circe.generic.auto._

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData}
import com.snowplowanalytics.lrumap._
import com.snowplowanalytics.snowplow.badrows.FailureDetails
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, ParseableEnrichment}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ApiRequestConf
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequest.ApiRequestEnrichment.ApiRequestEvaluator
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{CachingEvaluator, Enrichment, ParseableEnrichment}
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.{CirceUtils, HttpClient}
import io.circe._
import io.circe.generic.auto._

import java.util.UUID

object ApiRequestEnrichment extends ParseableEnrichment {

type ApiRequestEvaluator[F[_]] = CachingEvaluator[F, String, Json]

override val supportedSchema =
SchemaCriterion(
"com.snowplowanalytics.snowplow.enrichments",
Expand Down Expand Up @@ -91,13 +91,12 @@ object ApiRequestEnrichment extends ParseableEnrichment {
CreateApiRequestEnrichment[F].create(conf)
}

final case class ApiRequestEnrichment[F[_]: Monad: HttpClient](
final case class ApiRequestEnrichment[F[_]: Monad: HttpClient: Clock](
schemaKey: SchemaKey,
inputs: List[Input],
api: HttpApi,
outputs: List[Output],
ttl: Int,
cache: LruMap[F, String, (Either[Throwable, Json], Long)]
apiRequestEvaluator: ApiRequestEvaluator[F]
) extends Enrichment {
import ApiRequestEnrichment._

Expand Down Expand Up @@ -174,28 +173,19 @@ final case class ApiRequestEnrichment[F[_]: Monad: HttpClient](
output: Output
): F[Either[Throwable, Json]] =
for {
key <- Monad[F].pure(cacheKey(url, body))
gotten <- cache.get(key)
res <- gotten match {
case Some(response) =>
if (System.currentTimeMillis() / 1000 - response._2 < ttl) Monad[F].pure(response._1)
else put(key, url, body, output)
case None => put(key, url, body, output)
}
extracted = res.flatMap(output.extract)
result <- apiRequestEvaluator.evaluateForKey(key = cacheKey(url, body), getResult = () => callApi(url, body, output))
extracted = result.flatMap(output.extract)
described = extracted.map(output.describeJson)
} yield described

private def put(
key: String,
private def callApi(
url: String,
body: Option[String],
output: Output
): F[Either[Throwable, Json]] =
for {
response <- api.perform[F](url, body)
json = response.flatMap(output.parseResponse)
_ <- cache.put(key, (json, System.currentTimeMillis() / 1000))
} yield json

private def failureDetails(errors: NonEmptyList[String]) =
Expand All @@ -212,43 +202,28 @@ sealed trait CreateApiRequestEnrichment[F[_]] {
object CreateApiRequestEnrichment {
def apply[F[_]](implicit ev: CreateApiRequestEnrichment[F]): CreateApiRequestEnrichment[F] = ev

implicit def idCreateApiRequestEnrichment(
implicit CLM: CreateLruMap[Id, String, (Either[Throwable, Json], Long)],
HTTP: HttpClient[Id]
): CreateApiRequestEnrichment[Id] =
new CreateApiRequestEnrichment[Id] {
override def create(conf: ApiRequestConf): Id[ApiRequestEnrichment[Id]] =
CLM
.create(conf.cache.size)
.map(c =>
ApiRequestEnrichment(
conf.schemaKey,
conf.inputs,
conf.api,
conf.outputs,
conf.cache.ttl,
c
)
)
}

implicit def syncCreateApiRequestEnrichment[F[_]: Sync](
implicit CLM: CreateLruMap[F, String, (Either[Throwable, Json], Long)],
HTTP: HttpClient[F]
implicit def instance[F[_]: Monad: HttpClient: Clock](
implicit CLM: CreateLruMap[F, String, CachingEvaluator.CachedItem[Json]]
): CreateApiRequestEnrichment[F] =
new CreateApiRequestEnrichment[F] {
def create(conf: ApiRequestConf): F[ApiRequestEnrichment[F]] =
CLM
.create(conf.cache.size)
.map(c =>
def create(conf: ApiRequestConf): F[ApiRequestEnrichment[F]] = {
val cacheConfig = CachingEvaluator.Config(
size = conf.cache.size,
successTtl = conf.cache.ttl,
errorTtl = conf.cache.ttl / 10
)

CachingEvaluator
.create[F, String, Json](cacheConfig)
.map { evaluator =>
ApiRequestEnrichment(
conf.schemaKey,
conf.inputs,
conf.api,
conf.outputs,
conf.cache.ttl,
c
evaluator
)
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery

import cats.Monad
import cats.effect.Clock
import cats.implicits._

import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.CachingEvaluator
import com.zaxxer.hikari.HikariDataSource

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.SqlQueryConf
import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, ResourceF}
import io.circe.Json

import scala.collection.immutable.IntMap

/** Initialize resources, necessary for SQL Query enrichment: cache and connection */
sealed trait CreateSqlQueryEnrichment[F[_]] {
Expand All @@ -29,24 +33,32 @@ object CreateSqlQueryEnrichment {

def apply[F[_]](implicit ev: CreateSqlQueryEnrichment[F]): CreateSqlQueryEnrichment[F] = ev

implicit def createSqlQueryEnrichment[F[_]: DbExecutor: Monad: ResourceF](
implicit def createSqlQueryEnrichment[F[_]: DbExecutor: Monad: ResourceF: Clock](
implicit CLM: SqlCacheInit[F]
): CreateSqlQueryEnrichment[F] =
new CreateSqlQueryEnrichment[F] {
def create(conf: SqlQueryConf, blocker: BlockerF[F]): F[SqlQueryEnrichment[F]] =
for {
cache <- CLM.create(conf.cache.size)
} yield SqlQueryEnrichment(
conf.schemaKey,
conf.inputs,
conf.db,
conf.query,
conf.output,
conf.cache.ttl,
cache,
blocker,
getDataSource(conf.db)
def create(conf: SqlQueryConf, blocker: BlockerF[F]): F[SqlQueryEnrichment[F]] = {
val cacheConfig = CachingEvaluator.Config(
size = conf.cache.size,
successTtl = conf.cache.ttl,
errorTtl = conf.cache.ttl / 10
)

CachingEvaluator
.create[F, IntMap[Input.ExtractedValue], List[SelfDescribingData[Json]]](cacheConfig)
.map { evaluator =>
SqlQueryEnrichment(
conf.schemaKey,
conf.inputs,
conf.db,
conf.query,
conf.output,
evaluator,
blocker,
getDataSource(conf.db)
)
}
}
}

private def getDataSource(rdbms: Rdbms): HikariDataSource = {
Expand Down
Loading

0 comments on commit bb71d94

Please sign in to comment.