A Kafka Connect Single Message Transformation (SMT) that reads the serialized wire format header of Confluent's KafkaAvroSerializer
, performs a lookup against a source Confluent Schema Registry for the ID in the message, and registers that schema into a destination Registry for that topic/subject under a new ID.
To be used where it is not feasible to make the destination Schema Registry as a follower to the source Registry, or when migrating topics to a new cluster.
Requires that the Kafka Connect tasks can reach both Schema Registries.
This transform doesn't mirror the contents of the _schemas
topic, so therefore each registry can be completely isolated from one another. As a side-effect of this, the subject configurations that might be applied to the /config
endpoint in the source registry are not copied to the destination. In other words, you might get schema registration errors if using differing compatibility levels on the registries. Just a heads-up.
Example Kafka Connectors where this could be applied.
- Comcast/MirrorTool-for-Kafka-Connect - Code was tested with this first, and verified that the topic-renaming logic of this connector worked fine with this SMT.
- Salesforce/mirus
- Confluent Replicator - While this already can copy the schema, we observed it is only possible via the
AvroConverter
, which must first parse the entire message into a Kafka ConnectStruct
object. Thus, the class here is considered a "shallow" copier — it only inspects the first 5 bytes of the keys and values for the schema ids. - KIP-382 (MirrorMaker 2.0) - Still open at the time of writing.
- Edit the Kafka Connect worker properties file on each worker to include a new directory. For example,
/opt/kafka-connect/plugins
plugin.path=/usr/share/java,/opt/kafka-connect/plugins
- Build this project
./mvnw clean package
-
Copy the JAR from
target
to all Kafka Connect workers under a directory set byplugin.path
-
(Re)start Kafka Connect processes
Standalone Kafka Connect configuration section
# Requires that records are entirely byte-arrays. These can go in the worker or connector configuration.
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
# Setup the SMT
transforms=AvroSchemaTransfer
transforms.AvroSchemaTransfer.type=cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer
transforms.AvroSchemaTransfer.src.schema.registry.url=http://schema-registry-1:8081
transforms.AvroSchemaTransfer.dest.schema.registry.url=http://schema-registry-2:8081
Distributed Kafka Connect configuration section
"config" : {
...
"__comment": "Requires that records are entirely byte-arrays. These can go in the worker or connector configuration.",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"__comment": "Setup the SMT",
"transforms": "AvroSchemaTransfer",
"transforms.AvroSchemaTransfer.type": "cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer",
"transforms.AvroSchemaTransfer.src.schema.registry.url": "http://schema-registry-1:8081",
"transforms.AvroSchemaTransfer.dest.schema.registry.url": "http://schema-registry-2:8081"
}
Configuration Parameter | Default | Description |
---|---|---|
transfer.message.keys | true | Indicates whether Avro schemas from message keys in source records should be copied to the destination Registry. |
include.message.headers | true | Indicates whether message headers from source records should be preserved after the transform. |
schema.capacity | 100 | Capacity of schemas that can be cached in each CachedSchemaRegistryClient |
Schema Registry Transfer SMT passes some properties prefixed by either src.
or dest.
through to its embedded schema registry clients, after stripping away src.
or dest.
prefix used to disambiguate which client is to receive which configuration value.
Properties prefixed by src.
are passed through to the source consumer's schema registry
client. Properties prefixed by dest.
are passed through to the target producer's schema
registry client.
Configuration Parameter | Default | Description |
---|---|---|
(src|dest).basic.auth.credentials.source | URL | Specify how to pick credentials for Basic Auth header. Supported values are URL , USER_INFO and SASL_INHERIT |
(src|dest).basic.auth.user.info | Specify credentials for Basic Auth in form of {username}:{password} when source is USER_INFO |
Renaming of a subject can be done with the RegexRouter
Transform before this one.
Example Configuration
transforms=TopicRename,AvroSchemaTransfer
transforms.TopicRename.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.TopicRename.regex=(.*)
transforms.TopicRename.replacement=replica.$1
transforms.AvroSchemaTransfer.type=...