This repository is no longer actively maintained and has been deprecated. The codebase has been seamlessly integrated into the Stream Reactor project, starting from version 6. As a result, ongoing development and improvements are now being conducted within the Stream Reactor project.
The KCQL (Kafka Connect Query Languages) is a SQL like syntax allowing a streamlined configuration of a Kafka Connect Sink/Source. It is build using the antlr4
API.
While working on our sink/sources we ended up producing quite complex configuration in order to support the functionality required. Imagine a sink where you source from different topics and from each topic you want to cherry pick the payload fields or even rename them. Furthermore, you might want the storage structure to be automatically created and/or even evolve or you might add new support for the likes of bucketing (Riak TS has one such scenario). Imagine the JDBC sink with a table which needs to be linked to two different topics and the fields in there need to be aligned with the table column names and the complex configuration involved ...or you can just write this
routes.query = "INSERT INTO transactions SELECT field1 as column1, field2 as column2, field3 FROM topic_A;
INSERT INTO transactions SELECT fieldA1 as column1, fieldA2 as column2, fieldC FROM topic_B;"
This project is using the Gradle build system. So to build you would simply do
gradle clean build
If you modify the grammar you would need to first compile before the changes are reflected in the code. The antlr gradle plugin would run first and produced the java classes for the parser and lexer.
To include it in your project you, include it in your connector.
Maven
<dependency>
<groupId>com.datamountaineer</groupId>
<artifactId>kcql</artifactId>
<version>LATEST</version>
</dependency>
Check Maven for latest release.
There are three modes in the DSL : INSERT, UPSERT and SELECT.
The INSERT and UPSERT queries, support some of the following configurations:
INSERT INTO $TARGET
SELECT *|columns
FROM $TOPIC_NAME
[ IGNORE columns ]
[ AUTOCREATE ]
[ PK columns ]
[ AUTOEVOLVE ]
[ BATCH = N ]
[ CAPITALIZE ]
[ INITIALIZE ]
[ PARTITIONBY cola[,colb] ]
[ DISTRIBUTEBY cola[,colb] ]
[ CLUSTERBY cola[,colb] ]
[ WITHTIMESTAMP cola|sys_time() ]
[ STOREAS $YOUR_TYPE([key=value, .....]) ]
[ WITHFORMAT TEXT|AVRO|JSON|BINARY|OBJECT|MAP ]
[ PROPERTIES(key=value, ...) ]
- To view how a sink connector (i.e. Cassandra) manage configuration options, refer to documentation here
The SELECT mode is usefull for target systems that do not support the concept of (i.e. a an in-memory Key-Value system does not have the concept of a .
) and can also be utilized in the socket-streamer to peek into KAFKA via websockets and receive the payloads in real time.SELECT *|columns
FROM $TOPIC_NAME
[ IGNORE columns ]
[ WITHFORMAT JSON|AVRO|BINARY ]
[ WITHGROUP $YOUR_CONSUMER_GROUP ]
[ WITHPARTITION (partition),[(partition, offset) ]
[ STOREAS $YOUR_TYPE([key=value, .....]) ]
[ SAMPLE $RECORDS_NUMBER EVERY $SLIDE_WINDOW ]
SELECT field1 FROM mytopic // Project one avro field named field1
SELECT field1 AS newName // Project and renames a field
SELECT * FROM mytopic // Select everything - perfect for avro evolution
SELECT *, field1 AS newName FROM mytopic // Select all & rename a field - excellent for avro evolution
SELECT * FROM mytopic IGNORE badField // Select all & ignore a field - excellent for avro evolution
SELECT * FROM mytopic PK field1,field2 // Select all & with primary keys (for the sources where primary keys are required)
SELECT * FROM mytopic AUTOCREATE // Select all and create the target source (table for databases)
SELECT * FROM mytopic AUTOEVOLVE // Select all & reflect the new fields added to the avro payload into the target
.. NOOP | THROW | RETRY // Define the error policy
.. WHERE .. // Add filtering rules