Provides an example on how to make Apache Kafka data stored as Google Protobuf visible.
The project uses a Protobuf schema stored in credit_card.proto to generate the Java classes.
syntax = "proto2";
package io.lenses.examples.serde.protobuf;
option java_package = "io.lenses.examples.serde.protobuf.generated";
option java_outer_classname = "CardData";
message CreditCard {
required string name = 1;
required string country = 2;
required string currency = 3;
required string cardNumber = 4;
required bool blocked = 5;
required string type = 6;
}
Using the maven plugin protoc-jar-maven-plugin
the Java classes are generated under
src/io/lenses/examples/serde/protobuf/generated.
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.2.0.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<inputDirectories>
<include>src/main/proto</include>
</inputDirectories>
<outputTargets>
<outputTarget>
<type>java</type>
<outputDirectory>src/main/java</outputDirectory>
</outputTarget>
</outputTargets>
</configuration>
</execution>
</executions>
</plugin>
Lenses exposes an interface Serde
which needs to be implemented and then the artifacts dropped into the installation folder.
Check http://docs.lenses.io for details.
<dependency>
<groupId>com.landoop</groupId>
<artifactId>lenses-serde</artifactId>
<version>${lenses.serdes.version}</version>
</dependency>
@Override
public Serializer serializer(Properties properties) {
//not required
throw new NotImplementedException();
}
@Override
public Deserializer deserializer(Properties properties) {
// REQUIRED
}
@Override
public Schema getSchema() {
// REQUIRED
}
The plugin implementation has to code the two methods:
getSchema()
- describes the payload structure. Returns Avro Schemadeserializer(Properties properties)
- contains the logic to translate the raw bytes stored in Kafka as Avro GenericRecord
At the moment, the serializer(Properties properties)
is not required (not used).
For the example there are two implementations provided:
- CreditCardAutoProtobufSerde
- CreditCardProtobufSerde
The first class is more generic, however not as performant. The second one is the reverse - it yields better performance at the expense of more coding.
Returning the Avro schema relies on entirely Avro library to extract it from the generated Protobuf classes
private final static Schema schema = ProtobufData.get().getSchema(CardData.CreditCard.class);
@Override
public Schema getSchema() {
return schema;
}
Deserialization code takes the Kafka payload, which is raw bytes, and lifts it into a GenericRecord
.
First the raw bytes are translated via the Google API to the CreditCard
:
CardData.CreditCard card = CardData.CreditCard.parseFrom(bytes);
Next, the card details are written to an in memory array as Avro:
ProtobufDatumWriter<CardData.CreditCard> pbWriter = new ProtobufDatumWriter<CardData.CreditCard>(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
pbWriter.write(card, encoder);
encoder.flush();
Last step is to read the Avro bytes as GenericRecord
:
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(), null);
GenericRecord record = datumReader.read(null, decoder);
return record;
Going from bytes to CreditCard
to bytes to GenericRecord
can be short-circuited by avoiding the intermediary bytes set.
This is where this second implementation comes into play.
For this implementation the intermediary step is skipped.
This means there needs to be more manual code to populate the GenericRecord
.
@Override
public Deserializer deserializer(Properties properties) {
return new Deserializer() {
@Override
public GenericRecord deserialize(byte[] bytes) throws IOException {
CardData.CreditCard card = CardData.CreditCard.parseFrom(bytes);
GenericRecord record = new GenericData.Record(schema);
record.put("name", card.getName());
record.put("cardNumber", card.getCardNumber());
record.put("cardType", card.getType());
record.put("country", card.getCountry());
record.put("currency", card.getCurrency());
record.put("blocked", card.getBlocked());
return record;
}
@Override
public void close() throws IOException {
}
};
}
If the structure were to be nested say NestedObj
:
{
"a": 123,
"b": {
"x":"value1"
}
}
Then the code has to create a GenericRecord
for the b
field and one for the NestedObject
.
First step is to create the schemas:
private Schema fieldBSchema = SchemaBuilder.builder()
.record("field_b")
.fields()
.requiredString("x")
.endRecord()
private Schema schema = SchemaBuilder.builder()
.record("nested_obj")
.fields()
.requiredInt("a")
.name("b")
.type(fieldBSchema)
.noDefault()
.endRecord();
Next, the deserializer code needs to create and populate the GenericRecord
[-s]:
@Override
public Deserializer deserializer(Properties properties) {
return new Deserializer() {
@Override
public GenericRecord deserialize(byte[] bytes) throws IOException {
NestedObj obj = NestedObj.parseFrom(bytes);
GenericRecord record = new GenericData.Record(schema);
record.put("a", obj.getA());
GenericRecord recordFieldB = new GenericData.Record(fieldBSchema);
recordFieldB.put("x", obj.getB().getX());
record.put("b", recordFieldB);
return record;
}
@Override
public void close() throws IOException {
}
};
}
To compile:
mvn clean compile
To create the jar:
mvn clean package
Follow the docs(https://docs.lenses.io/4.0/configuration/sql/kubernetes/#custom-serde) and provide to Lenses:
target/lenses-serde-protobuf-example-1.0.0.jar
deps/avro-protobuf-1.8.2.jar