A simple API for creating an embedded Kafka environment with the KafkaEnvironment class, typically used for running integration tests.
Based on the Confluent Open Source distribution v7.2.x.
Instead of using the classic ports (2181, 9092, ...) for each server, the class will get the required number of available ports and use those in configurations for each server.
class KafkaEnvironment(
noOfBrokers: Int = 1,
topicNames: List<String> = emptyList(),
topicInfos: List<TopicInfo> = emptyList(),
withSchemaRegistry: Boolean = false,
val withSecurity: Boolean = false,
users: List<JAASCredential> = emptyList(),
autoStart: Boolean = false,
brokerConfigOverrides: Properties = Properties()
) : AutoCloseable {
data class TopicInfo(val name: String, val partitions: Int = 2, val config: Map<String, String>? = null)
}
fun start() // start servers in correct order
fun stop() // stop servers in correct order - session data are available
fun tearDown() // when finished with the kafka environment, stops servers and remove session data
Maximum allowed number of brokers is 2.
The 'withSecurity' parameter gives a kafka cluster with security, thus authentication and authorization
- Secured Zookeeper, kafka broker must authenticate and be authorized
- Secured Kafka broker (and inter broker), clients (including Schema registry) must authenticate and be authorized
- No security for Schema Registry clients
- No TLS
See JAASContext.kt
for details and predefined set of producer and consumer credentials.
Observe that auto creation of topics is disabled when security is enabled.
The 'users' parameter is an option for custom set of producer and consumer credentials. Only relevant when security is enabled.
Add the dependency:
dependencies {
testImplementation "no.nav:kafka-embedded-env:3.2.4"
}
<dependency>
<groupId>no.nav</groupId>
<artifactId>kafka-embedded-env</artifactId>
<version>3.2.4</version>
<scope>test</scope>
</dependency>
Note: It is recommended that you use the Confluent version matching this library - currently v7.2.x
val kafkaEnv = KafkaEnvironment()
kafkaEnv.start()
// do stuff
kafkaEnv.tearDown()
The default settings gives
- 1 Zookeeper
- 1 Kafka broker
val kafkaEnv = KafkaEnvironment(
noOfBrokers = 2,
topicNames = listOf("test1", "test2", "test3"),
withSchemaRegistry = true,
autoStart = true
)
// do stuff
kafkaEnv.tearDown()
The above configuration gives
- 1 Zookeeper instance
- 2 Kafka brokers
- 1 Schema Registry instance
Given topics are automatically created and all servers are started in correct order - ready to use. Each topic will have number of partitions equal to number of brokers.
val kafkaEnv = KafkaEnvironment(
noOfBrokers = 2,
topicNames = listOf("custom1"),
withSecurity = true,
users = listOf(JAASCredential("myP1", "myP1p"),JAASCredential("myC1", "myC1p")),
autoStart = true
)
// do stuff
kafkaEnv.tearDown()
The above configuration gives
- 1 Zookeeper instance
- 2 Kafka brokers
Given users are added to Kafka brokers JAAS context (authentication) and the topic is automatically created. Observe that relevant authorization must be given before produce and consume scenario is activated.
For 'crash course' approach, see relevant test cases with security in KafkaEnvironmentSpec.kt
An instance of KafkaEnvironment has a serverPark (ServerPark) property, giving access to details depending on the state. Each server (ServerBase) has a few relevant properties and start/stop functions.
data class ServerPark(
val zookeeper: ServerBase,
val brokerStatus: BrokerStatus,
val schemaRegStatus: SchemaRegistryStatus,
val status: ServerParkStatus
)
abstract class ServerBase {
protected var status: ServerStatus = NotRunning
open val host: String = "localhost"
abstract val port: Int
abstract val url: String
abstract fun start()
abstract fun stop()
}
Thus each server can be stopped and started independently.
In order to ease the state handling, some properties are available.
val zookeeper get() = serverPark.zookeeper as ZKServer
val brokers get() = serverPark.getBrokers()
val brokersURL get() = serverPark.getBrokersURL()
val adminClient get() = serverPark.getAdminClient()
val schemaRegistry get() = serverPark.getSchemaReg()
Be aware of what you are doing
- kafka environment without brokers (noOfBrokers = 0), gives empty list for 'brokers'
- non started kafka environment gives 'null' for 'adminClient'
- a started kafka environment without brokers still gives 'null' for 'adminClient'
- ...
The 'adminClient' property creates an instance of AdminClient with super user authorization. Thus, feel free to play with it. See Kafka AdminClient API for the set of available operations.
Please close adminClient after use.
class KafkaEnvironment
:
- The parameter
topics
should be renamed totopicNames
if you wish to keep the behaviour from previous versions.
Build and test
./mvnw clean install
Create an issue here on the GitHub issue tracker. Pull requests are also welcome.
Internal resources may reach us on Slack in the #kafka channel.