Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bigquery Sink using depot #179

Merged
merged 12 commits into from
Aug 5, 2022
29 changes: 14 additions & 15 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'io.odpf'
version '0.3.3'
version '0.4.0'

def projName = "firehose"

Expand All @@ -48,7 +48,6 @@ repositories {
}
}


private Properties loadEnv() {
Properties properties = new Properties()
properties.load(new FileInputStream(file("${projectDir}/env/local.properties")));
Expand Down Expand Up @@ -86,7 +85,7 @@ dependencies {
exclude group: "log4j", module: "log4j"
}
implementation 'io.confluent:monitoring-interceptors:3.0.0'
implementation "io.grpc:grpc-all:1.18.0"
implementation "io.grpc:grpc-all:1.38.0"
implementation group: 'org.jfrog.buildinfo', name: 'build-info-extractor', version: '2.6.3'
implementation group: 'com.google.gradle', name: 'osdetector-gradle-plugin', version: '1.2.1'
implementation group: 'org.apache.ivy', name: 'ivy', version: '2.2.0'
Expand All @@ -102,12 +101,12 @@ dependencies {
implementation 'com.google.cloud:google-cloud-storage:1.114.0'
implementation 'com.google.cloud:google-cloud-bigquery:1.115.0'
implementation 'org.apache.logging.log4j:log4j-core:2.17.1'

implementation group: 'io.odpf', name: 'depot', version: '0.1.7'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'

testImplementation group: 'junit', name: 'junit', version: '4.11'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'org.mockito:mockito-core:2.0.99-beta'
testImplementation 'org.mockito:mockito-core:4.5.1'
testImplementation "com.github.tomakehurst:wiremock:2.3.1"
testImplementation group: 'io.opentracing', name: 'opentracing-mock', version: '0.33.0'
testImplementation group: 'org.mock-server', name: 'mockserver-netty', version: '3.10.5'
Expand All @@ -133,7 +132,7 @@ protobuf {
task.generateDescriptorSet = true
task.descriptorSetOptions.includeSourceInfo = false
task.descriptorSetOptions.includeImports = true
task.descriptorSetOptions.path = "$projectDir/src/test/resources/__files/descriptors.bin"
task.descriptorSetOptions.path = "$projectDir/src/test/resources/__files/descriptors.bin"
}
}
}
Expand All @@ -157,17 +156,17 @@ test {
clean {
delete "$projectDir/src/test/resources/__files"
}

jar {
manifest {
attributes 'Main-Class': 'io.odpf.firehose.launch.Main'
duplicatesStrategy = 'exclude'
}
from {
configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
}
manifest {
attributes 'Main-Class': 'io.odpf.firehose.launch.Main'
duplicatesStrategy = 'exclude'
zip64 = true
}
from {
configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
}
exclude('META-INF/*.RSA', 'META-INF/*.SF', 'META-INF/*.DSA')
}

publishing {
publications {
maven(MavenPublication) {
Expand Down
Binary file modified docs/docs/advance/generic.md
Binary file not shown.
17 changes: 12 additions & 5 deletions docs/docs/concepts/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ _**Sink**_
- All the existing sink types follow the same contract/lifecycle defined in `AbstractSink.java`. It consists of two stages:
- Prepare: Transformation over-filtered messages’ list to prepare the sink-specific insert/update client requests.
- Execute: Requests created in the Prepare stage are executed at this step and a list of failed messages is returned \(if any\) for retry.
- Underlying implementation of AbstractSink can use implementation present in [depot](https://github.com/odpf/depot).
- If the batch has any failures, Firehose will retry to push the failed messages to the sink

_**SinkPool**_
Expand Down Expand Up @@ -70,14 +71,20 @@ The final state of message can be any one of the followings after it is consumed
One can monitor via plotting the metrics related to messages.

### Schema Handling
- Incase when `INPUT_SCHEMA_DATA_TYPE is set to protobuf`
- Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. Data streams on Kafka topics are bound to a protobuf schema.
- Firehose deserializes the data consumed from the topics using the Protobuf descriptors generated out of the artifacts. The artifactory is an HTTP interface that Firehose uses to deserialize.
- The schema handling ie., find the mapped schema for the topic, downloading the descriptors, and dynamically being notified of/updating with the latest schema is abstracted through the Stencil library.

- Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. Data streams on Kafka topics are bound to a protobuf schema.
- Firehose deserializes the data consumed from the topics using the Protobuf descriptors generated out of the artifacts. The artifactory is an HTTP interface that Firehose uses to deserialize.
- The schema handling ie., find the mapped schema for the topic, downloading the descriptors, and dynamically being notified of/updating with the latest schema is abstracted through the Stencil library.
The Stencil is a proprietary library that provides an abstraction layer, for schema handling.

The Stencil is a proprietary library that provides an abstraction layer, for schema handling.
Schema Caching, dynamic schema updates, etc. are features of the stencil client library.

Schema Caching, dynamic schema updates, etc. are features of the stencil client library.
- Incase when `INPUT_SCHEMA_DATA_TYPE is set to json`
- Currently this config is only supported in Bigquery sink,
- For json, in bigquery sink the schema is dynamically inferred from incoming data, in future we plan to provide json schema support via stencil.



## Firehose Integration

Expand Down
4 changes: 2 additions & 2 deletions docs/docs/concepts/filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ You can read more about JSON Schema [here](https://json-schema.org/). For more d

The filtering occurs in the following steps -

- JSON filter configurations are validated and logged to instrumentation by JsonFilterUtil. In case any configuration is invalid, then IllegalArgumentException is thrown and Firehose is terminated.
- JSON filter configurations are validated and logged to firehoseInstrumentation by JsonFilterUtil. In case any configuration is invalid, then IllegalArgumentException is thrown and Firehose is terminated.
- If `FILTER_ESB_MESSAGE_FORMAT=PROTOBUF`, then the serialized key/message protobuf byte array is deserialized to POJO object by the Proto schema class. It is then converted to a JSON string so that it can be parsed by the JSON Schema Validator.
- If`FILTER_ESB_MESSAGE_FORMAT=JSON`, then the serialized JSON byte array is deserialized to a JSON message string.
- The JSON Schema validator performs a validation on the JSON message against the filter rules specified in the JSON Schema string provided in the environment variable`FILTER_JSON_SCHEMA.`
- If there are any validation errors, then that key/message is filtered out and the validation errors are logged to the instrumentation in debug mode.
- If there are any validation errors, then that key/message is filtered out and the validation errors are logged to the firehoseInstrumentation in debug mode.
- If all validation checks pass, then the key/message is added to the ArrayList of filtered messages and returned by the JsonFilter.

## Why Use Filters
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/contribute/contribution.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ To help you get your feet wet and get you familiar with our contribution process
We use RFCs and GitHub issues to communicate ideas.

- You can report a bug or suggest a feature enhancement or can just ask questions. Reach out on Github discussions for this purpose.
- You are also welcome to add a new sink, improve monitoring and logging and improve code quality.
- You are also welcome to add a new common sink in [depot](https://github.com/odpf/depot), improve monitoring and logging and improve code quality.
- You can help with documenting new features or improve existing documentation.
- You can also review and accept other contributions if you are a maintainer.

Expand Down
3 changes: 2 additions & 1 deletion docs/docs/contribute/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Configuration parameter variables of each sink can be found in the [Configuratio

### Schema Registry

Firehose uses Stencil Server as its Schema Registry for hosting Protobuf descriptors. The environment variable `SCHEMA_REGISTRY_STENCIL_ENABLE` must be set to `true` . Stencil server URL must be specified in the variable `SCHEMA_REGISTRY_STENCIL_URLS` . The Proto Descriptor Set file of the Kafka messages must be uploaded to the Stencil server.
When `INPUT_SCHEMA_DATA_TYPE is set to protobuf`, firehose uses Stencil Server as its Schema Registry for hosting Protobuf descriptors. The environment variable `SCHEMA_REGISTRY_STENCIL_ENABLE` must be set to `true` . Stencil server URL must be specified in the variable `SCHEMA_REGISTRY_STENCIL_URLS` . The Proto Descriptor Set file of the Kafka messages must be uploaded to the Stencil server.

Refer [this guide](https://github.com/odpf/stencil/tree/master/server#readme) on how to set up and configure the Stencil server, and how to generate and upload Proto descriptor set file to the server.

Expand Down Expand Up @@ -71,6 +71,7 @@ Set the generic variables in the local.properties file.
```text
KAFKA_RECORD_PARSER_MODE = message
SINK_TYPE = log
INPUT_SCHEMA_DATA_TYPE=protobuf
INPUT_SCHEMA_PROTO_CLASS = io.odpf.firehose.consumer.TestMessage
```
Set the variables which specify the kafka server, topic name, and group-id of the kafka consumer - the standard values are used here.
Expand Down
7 changes: 5 additions & 2 deletions docs/docs/guides/create_firehose.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ SOURCE_KAFKA_TOPIC=test-topic
KAFKA_RECOED_CONSUMER_GROUP_ID=sample-group-id
KAFKA_RECORD_PARSER_MODE=message
SINK_TYPE=log
INPUT_SCHEMA_DATA_TYPE=protobuf
INPUT_SCHEMA_PROTO_CLASS=com.tests.TestMessage
```

Expand Down Expand Up @@ -129,8 +130,10 @@ _**Note:**_ [_**DATABASE**_](../sinks/influxdb-sink.md#sink_influx_db_name) _**a
## Create a Bigquery sink

- it requires the following [variables](../sinks/bigquery-sink.md) to be set.
- This sink will generate bigquery schema from protobuf message schema and update bigquery table with the latest generated schema.
- The protobuf message of a `google.protobuf.Timestamp` field might be needed when table partitioning is enabled.
- For INPUT_SCHEMA_DATA_TYPE = protobuf, this sink will generate bigquery schema from protobuf message schema and update bigquery table with the latest generated schema.
- The protobuf message of a `google.protobuf.Timestamp` field might be needed when table partitioning is enabled.
- For INPUT_SCHEMA_DATA_TYPE = json, this sink will generate bigquery schema by infering incoming json. In future we will add support for json schema as well coming from stencil.
- The timestamp column is needed incase of partition table. It can be generated at the time of ingestion by setting the config. Please refer to config `SINK_BIGQUERY_ADD_EVENT_TIMESTAMP_ENABLE` in [depot bigquery sink config section](https://github.com/odpf/depot/blob/main/docs/reference/configuration/bigquery-sink.md#sink_bigquery_add_event_timestamp_enable)
- Google cloud credential with some bigquery permission is required to run this sink.

If you'd like to connect to a sink which is not yet supported, you can create a new sink by following the [contribution guidelines](../contribute/contribution.md)
5 changes: 5 additions & 0 deletions docs/docs/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ Discover why users choose Firehose as their main Kafka Consumer
- **Runtime** Firehose can run inside containers or VMs in a fully managed runtime environment like Kubernetes.
- **Metrics** Always know what’s going on with your deployment with built-in monitoring of throughput, response times, errors, and more.

## Supported Incoming data types from kafka
- [Protobuf](https://developers.google.com/protocol-buffers)
- [JSON](https://www.json.org/json-en.html)
- Supported limited to bigquery, elastic and mongo sink. In future support to other sinks will be added

## Supported Sinks:

Following sinks are supported in the Firehose
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/reference/core-faqs.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Firehose provides various Kafka client configurations. Refer [Generic Configurat

## What all data formats are supported ?

Elasticsearch and MongoDB sink support both JSON and Protobuf as the input schema. For other sinks, we currently support only Protobuf. Support for JSON and Avro is planned and incorporated in our roadmap. Please refer to our Roadmap section for more details.
Elasticsearch , Bigquery and MongoDB sink support both JSON and Protobuf as the input schema. For other sinks, we currently support only Protobuf. Support for JSON and Avro is planned and incorporated in our roadmap. Please refer to our Roadmap section for more details.

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. Data streams on Kafka topics are bound to a Protobuf schema.

Expand Down Expand Up @@ -146,7 +146,7 @@ No, all fields from the input key/message will be sent by Firehose to the Sink.

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. Data streams on Kafka topics are bound to a Protobuf schema. Protobuf is much more lightweight that other schema formats like JSON, since it encodes the keys in the message to integers.

Elasticsearch and MongoDB sink support both JSON and Protobuf as the input schema.
Elasticsearch, Bigquery and MongoDB sink support both JSON and Protobuf as the input schema.

For other sinks, we currently support only Protobuf. Support for JSON and Avro is planned and incorporated in our roadmap. Please refer to our Roadmap section for more details.

Expand Down
5 changes: 3 additions & 2 deletions docs/docs/reference/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -771,15 +771,16 @@ section.

#### What all data formats are supported?

ElasticSearch and MongoDB sink support both JSON and Protobuf as the input schema. For other sinks, we currently support
ElasticSearch, Bigquery and MongoDB sink support both JSON and Protobuf as the input schema. For other sinks, we currently support
only Protobuf. Support for JSON and Avro is planned and incorporated in the roadmap.

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serialising structured data.
Data streams on Kafka topics are bound to a Protobuf schema. Follow the instructions
When `INPUT_SCHEMA_DATA_TYPE=protobuf` Data streams on Kafka topics are bound to a Protobuf schema. Follow the instructions
in [this article](https://developers.google.com/protocol-buffers/docs/javatutorial) on how to create, compile and
serialize a Protobuf object to send it to a binary OutputStream.
Refer [this guide](https://developers.google.com/protocol-buffers/docs/proto3) for detailed Protobuf syntax and rules
to create a `.proto` file.
When `INPUT_SCHEMA_DATA_TYPE=json` data streams on kafka topics are bound to having a valid json message.

#### Can we select particular fields from the input message?

Expand Down
18 changes: 1 addition & 17 deletions docs/docs/reference/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Service-level Indicators \(SLIs\) are the measurements used to calculate the per
* [HTTP Sink](metrics.md#http-sink)
* [Filter](metrics.md#filter)
* [Blob Sink](metrics.md#blob-sink)
* [Bigquery Sink](https://github.com/odpf/depot/blob/main/docs/reference/metrics.md#bigquery-sink)

## Type Details

Expand Down Expand Up @@ -368,20 +369,3 @@ Total Size of the uploaded file in bytes.
### `File Upload Records Total`

Total number records inside files that successfully being uploaded to blob storage.

## Bigquery Sink

### `Biquery Operation Total`

Total number of bigquery API operation performed

### `Bigquery Operation Latency`

Time taken for bigquery API operation performed

### `Bigquery Errors Total`

Total numbers of error occurred on bigquery insert operation.



Loading