Skip to content

Commit

Permalink
Loader: make the part appended to folder monitoring staging path conf…
Browse files Browse the repository at this point in the history
…igurable (close #969)
  • Loading branch information
spenes committed Jul 18, 2022
1 parent 5ae6397 commit e367d37
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@ object Config {
final case class StatsD(hostname: String, port: Int, tags: Map[String, String], prefix: Option[String])
final case class Stdout(prefix: Option[String])
final case class Webhook(endpoint: Uri, tags: Map[String, String])
final case class Folders(period: FiniteDuration, staging: S3.Folder, since: Option[FiniteDuration], transformerOutput: S3.Folder, until: Option[FiniteDuration], failBeforeAlarm: Option[Int])
final case class Folders(period: FiniteDuration,
staging: S3.Folder,
since: Option[FiniteDuration],
transformerOutput: S3.Folder,
until: Option[FiniteDuration],
failBeforeAlarm: Option[Int],
appendStagingPath: Option[Boolean])
final case class RetryQueue(period: FiniteDuration, size: Int, maxAttempts: Int, interval: FiniteDuration)
final case class Timeouts(loading: FiniteDuration, nonLoading: FiniteDuration, sqsVisibility: FiniteDuration)
final case class Retries(strategy: Strategy, attempts: Option[Int], backoff: FiniteDuration, cumulativeBound: Option[FiniteDuration])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,14 @@ object FolderMonitoring {
.realTime(TimeUnit.MILLISECONDS)
.map(Instant.ofEpochMilli)
.map(LogTimeFormatter.format)
.map(time => folders.staging.append("shredded").append(time))
.map { time =>
val stagingPath =
if (folders.appendStagingPath.getOrElse(true))
folders.staging.append("shredded")
else
folders.staging
stagingPath.append(time)
}

Stream.eval(getKey) ++ Stream.fixedDelay[F](folders.period).evalMap(_ => getKey)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ object ConfigSpec {
Some(Config.Sentry(URI.create("http://sentry.acme.com"))),
Config.Metrics(Some(Config.StatsD("localhost", 8125, Map("app" -> "rdb-loader"), None)), Some(Config.Stdout(None)), 5.minutes),
Some(Config.Webhook(uri"https://webhook.acme.com", Map("pipeline" -> "production"))),
Some(Config.Folders(1.hour, S3.Folder.coerce("s3://acme-snowplow/loader/logs/"), Some(14.days), S3.Folder.coerce("s3://acme-snowplow/loader/transformed/"), Some(7.days), Some(3))),
Some(Config.Folders(1.hour, S3.Folder.coerce("s3://acme-snowplow/loader/logs/"), Some(14.days), S3.Folder.coerce("s3://acme-snowplow/loader/transformed/"), Some(7.days), Some(3), None)),
Some(Config.HealthCheck(20.minutes, 15.seconds)),
)
val defaultMonitoring = Config.Monitoring(None, None, Config.Metrics(None, Some(Config.Stdout(None)), 5.minutes), None, None, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class FolderMonitoringSpec extends Specification {
"produce new keys with interval" in {
implicit val T = IO.timer(scala.concurrent.ExecutionContext.global)
val result = FolderMonitoring
.getOutputKeys[IO](Config.Folders(1.second, S3.Folder.coerce("s3://acme/logs/"), None, S3.Folder.coerce("s3://acme/shredder-output/"), None, Some(3)))
.getOutputKeys[IO](Config.Folders(1.second, S3.Folder.coerce("s3://acme/logs/"), None, S3.Folder.coerce("s3://acme/shredder-output/"), None, Some(3), Some(true)))
.take(2)
.compile
.toList
Expand Down

0 comments on commit e367d37

Please sign in to comment.