Skip to content

Commit

Permalink
Transformer Pubsub: support Parquet output option (close #1124)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Nov 18, 2022
1 parent 429cca1 commit c1d786f
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 16 deletions.
4 changes: 4 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ lazy val transformerKinesis = project
.in(file("modules/transformer-kinesis"))
.settings(BuildSettings.transformerKinesisBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.transformerKinesisDependencies)
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", aws % "compile->compile;test->test")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin)
Expand All @@ -132,6 +133,7 @@ lazy val transformerKinesisDistroless = project
.settings(sourceDirectory := (transformerKinesis / sourceDirectory).value)
.settings(BuildSettings.transformerKinesisDistrolessBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.transformerKinesisDependencies)
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", aws % "compile->compile;test->test")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin, LauncherJarPlugin)
Expand All @@ -140,6 +142,7 @@ lazy val transformerPubsub = project
.in(file("modules/transformer-pubsub"))
.settings(BuildSettings.transformerPubsubBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.transformerPubsubDependencies)
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", gcp % "compile->compile;test->test")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin)
Expand All @@ -149,6 +152,7 @@ lazy val transformerPubsubDistroless = project
.settings(sourceDirectory := (transformerPubsub / sourceDirectory).value)
.settings(BuildSettings.transformerPubsubDistrolessBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.transformerPubsubDependencies)
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", gcp % "compile->compile;test->test")
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin, LauncherJarPlugin)
5 changes: 1 addition & 4 deletions config/loader/aws/databricks.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
# SQS topic name used by Transformer and Loader to communicate
"messageQueue": "test-queue",

# Optional. S3 path that holds JSONPaths
"jsonpaths": "s3://bucket/jsonpaths/",

# Warehouse connection details
"storage" : {
# Hostname of Databricks cluster
Expand Down Expand Up @@ -83,7 +80,7 @@
"monitoring": {
# Snowplow tracking (optional)
"snowplow": {
"appId": "redshift-loader",
"appId": "databricks-loader",
"collector": "snplow.acme.com",
},

Expand Down
13 changes: 13 additions & 0 deletions config/loader/gcp/databricks.config.minimal.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"messageQueue": {
"type": "pubsub"
"subscription": "projects/project-id/subscriptions/subscription-id"
},
"storage" : {
"host": "abc.cloud.databricks.com"
"password": "Supersecret1"
"schema": "atomic",
"port": 443,
"httpPath": "/databricks/http/path",
}
}
229 changes: 229 additions & 0 deletions config/loader/gcp/databricks.config.reference.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
{
# Pubsub subscription used by Transformer and Loader to communicate
"messageQueue": {
"type": "pubsub"
"subscription": "projects/project-id/subscriptions/subscription-id"
},

# Warehouse connection details
"storage" : {
# Hostname of Databricks cluster
"host": "abc.cloud.databricks.com",
# DB password
"password": {
# A password can be placed in EC2 parameter store, GCP Secret Manager or be a plain text
"secretStore": {
"parameterName": "snowplow.databricks.password"
}
},
# Optional. Override the Databricks default catalog, e.g. with a Unity catalog name.
"catalog": "hive_metastore",
# DB schema
"schema": "atomic",
# Database port
"port": 443,
# Http Path of Databricks cluster
"httpPath": "/databricks/http/path",
# User agent name for Databricks connection. Optional, default value "snowplow-rdbloader-oss"
"userAgent": "snowplow-rdbloader-oss"

# Optimize period per table, that will be used as predicate for the OPTIMIZE command.
"eventsOptimizePeriod": "2 days"
},

"schedules": {
# Periodic schedules to stop loading, e.g. for Redshift maintenance window
# Any amount of schedules is supported, but recommended to not overlap them
# The schedule works with machine's local timezone (and UTC is recommended)
"noOperation": [
{
# Human-readable name of the no-op window
"name": "Maintenance window",
# Cron expression with second granularity
"when": "0 0 12 * * ?",
# For how long the loader should be paused
"duration": "1 hour"
}
],
# Loader runs periodic OPTIMIZE statements to prevent growing number of files behind delta tables.
"optimizeEvents": "0 0 0 ? * *",
"optimizeManifest": "0 0 5 ? * *"
}

# Observability and reporting options
"monitoring": {
# Snowplow tracking (optional)
"snowplow": {
"appId": "databricks-loader",
"collector": "snplow.acme.com",
},

# An endpoint for alerts and infromational events
# Everything sent to snowplow collector (as properly formed self-describing events)
# will also be sent to the webhook as POST payloads with self-describing JSONs
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
},

# Optional, for tracking runtime exceptions
"sentry": {
"dsn": "http://sentry.acme.com"
},

# Optional, configure how metrics are reported
"metrics": {
# Optional, send metrics to StatsD server
"statsd": {
"hostname": "localhost",
"port": 8125,
# Any key-value pairs to be tagged on every StatsD metric
"tags": {
"app": "rdb-loader"
}
# Optional, override the default metric prefix
# "prefix": "snowplow.rdbloader."
},

# Optional, print metrics on stdout (with slf4j)
"stdout": {
# Optional, override the default metric prefix
# "prefix": "snowplow.rdbloader."
}

# Optional, period for metrics emitted periodically
# Default value 5 minutes
# There is only one periodic metric at the moment.
# This metric is minimum_age_of_loaded_data.
# It specifies how old is the latest event in the warehouse.
"period": "5 minutes"
},

# Optional, configuration for periodic unloaded/corrupted folders checks
"folders": {
# Path where Loader could store auxiliary logs
# Loader should be able to write here, Redshift should be able to load from here
"staging": "gs://acme-snowplow/loader/logs/",
# How often to check
"period": "1 hour"
# Specifies since when folder monitoring will check
"since": "14 days"
# Specifies until when folder monitoring will check
"until": "7 days"
# Path to transformer archive (must be same as Transformer's `output.path`)
"transformerOutput": "gs://acme-snowplow/loader/transformed/"
# How many times the check can fail before generating an alarm instead of warning
"failBeforeAlarm": 3
},

# Periodic DB health-check, raising a warning if DB hasn't responded to `SELECT 1`
"healthCheck": {
# How often query a DB
"frequency": "20 minutes",
# How long to wait for a response
"timeout": "15 seconds"
}
},

# Immediate retries configuration
# Unlike retryQueue these retries happen immediately, without proceeding to another message
"retries": {
# Starting backoff period
"backoff": "30 seconds"
# A strategy to use when deciding on next backoff
"strategy": "EXPONENTIAL"
# How many attempts to make before sending the message into retry queue
# If missing - the loader will be retrying until cumulative bound
"attempts": 3,
# When backoff reaches this delay the Loader will stop retrying
# Missing cumulativeBound with missing attempts will force to retry inifintely
"cumulativeBound": "1 hour"
},

# Check the target destination to make sure it is ready.
# Retry the checking until target got ready and block the application in the meantime
"readyCheck": {
# Starting backoff period
"backoff": "15 seconds"
# A strategy to use when deciding on next backoff
"strategy": "CONSTANT"
},

# Retries configuration for initilization block
# It will retry on all exceptions from there
"initRetries": {
# Starting backoff period
"backoff": "30 seconds"
# A strategy to use when deciding on next backoff
"strategy": "EXPONENTIAL"
# How many attempts to make before sending the message into retry queue
# If missing - the loader will be retrying until cumulative bound
"attempts": 3,
# When backoff reaches this delay the Loader will stop retrying
# Missing cumulativeBound with missing attempts will force to retry inifintely
"cumulativeBound": "1 hour"
},

# Additional backlog of recently failed folders that could be automatically retried
# Retry Queue saves a failed folder and then re-reads the info from shredding_complete S3 file
"retryQueue": {
# How often batch of failed folders should be pulled into a discovery queue
"period": "30 minutes",
# How many failures should be kept in memory
# After the limit is reached new failures are dropped
"size": 64,
# How many attempt to make for each folder
# After the limit is reached new failures are dropped
"maxAttempts": 3,
# Artificial pause after each failed folder being added to the queue
"interval": "5 seconds"
},

"timeouts": {
# How long loading (actual COPY statements) can take before considering Redshift unhealthy
# Without any progress (i.e. different subfolder) within this period, loader
# will abort the transaction
# If 'TempCreds' load auth method is used, this value will be used as session duration
# of temporary credentials. In that case, it can't be greater than maximum session duration
# of IAM role used for temporary credentials
"loading": "1 hour",

# How long non-loading steps (such as ALTER TABLE or metadata queries) can take
# before considering Redshift unhealthy
"nonLoading": "10 minutes"
}

# Optional. Configure telemetry
# All the fields are optional
"telemetry": {
# Set to true to disable telemetry
"disable": false
# Interval for the heartbeat event
"interval": 15 minutes
# HTTP method used to send the heartbeat event
"method": "POST"
# URI of the collector receiving the heartbeat event
"collectorUri": "collector-g.snowplowanalytics.com"
# Port of the collector receiving the heartbeat event
"collectorPort": 443
# Whether to use https or not
"secure": true
# Identifier intended to tie events together across modules,
# infrastructure and apps when used consistently
"userProvidedId": "my_pipeline"
# ID automatically generated upon running a modules deployment script
# Intended to identify each independent module, and the infrastructure it controls
"autoGeneratedId": "hfy67e5ydhtrd"
# Unique identifier for the VM instance
# Unique for each instance of the app running within a module
"instanceId": "665bhft5u6udjf"
# Name of the terraform module that deployed the app
"moduleName": "rdb-loader-ce"
# Version of the terraform module that deployed the app
"moduleVersion": "1.0.0"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
"topic": "projects/project-id/topics/topic-id"
}

"formats": {
# Optional. Denotes output file format.
# Possible values are 'json' and 'parquet'. Default value 'json'.
"fileFormat": "json"
}

# Events will be validated against given criterias and
# bad row will be created if validation is not successful
"validations": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,6 @@ object Config {

private def validateConfig(config: Config): List[String] =
List(
TransformerConfig.formatsCheck(config.formats).swap.map(List(_)).getOrElse(List.empty),
gcpFormatCheck(config)
TransformerConfig.formatsCheck(config.formats).swap.map(List(_)).getOrElse(List.empty)
).flatten

private def gcpFormatCheck(config: Config): List[String] =
(config.input, config.formats) match {
case (_: Config.StreamInput.Pubsub, TransformerConfig.Formats.WideRow.PARQUET) =>
List("Parquet file format can't be used with transformer-pubsub")
case _ => Nil
}

}
Loading

0 comments on commit c1d786f

Please sign in to comment.