diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaTopicClient.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaTopicClient.java index 6860d71a6ef4..66fb57628d06 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaTopicClient.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedKafkaTopicClient.java @@ -20,6 +20,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.confluent.ksql.topic.TopicProperties; +import io.confluent.ksql.util.KsqlServerException; import io.confluent.ksql.util.LimitedProxyBuilder; import java.util.Collection; import java.util.Collections; @@ -29,8 +31,10 @@ import java.util.Map; import java.util.Objects; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -46,8 +50,9 @@ @SuppressWarnings("unused") // Methods invoked via reflection. final class SandboxedKafkaTopicClient { - static KafkaTopicClient createProxy(final KafkaTopicClient delegate) { - final SandboxedKafkaTopicClient sandbox = new SandboxedKafkaTopicClient(delegate); + static KafkaTopicClient createProxy(final KafkaTopicClient delegate, + final Supplier sharedAdmin) { + final SandboxedKafkaTopicClient sandbox = new SandboxedKafkaTopicClient(delegate, sharedAdmin); return LimitedProxyBuilder.forClass(KafkaTopicClient.class) .forward("createTopic", @@ -63,12 +68,17 @@ static KafkaTopicClient createProxy(final KafkaTopicClient delegate) { .build(); } + private static final String DEFAULT_REPLICATION_PROP = "default.replication.factor"; + private final KafkaTopicClient delegate; + private final Supplier adminClient; private final Map createdTopics = new HashMap<>(); - private SandboxedKafkaTopicClient(final KafkaTopicClient delegate) { + private SandboxedKafkaTopicClient(final KafkaTopicClient delegate, + final Supplier sharedAdminClient) { this.delegate = Objects.requireNonNull(delegate, "delegate"); + this.adminClient = Objects.requireNonNull(sharedAdminClient, "sharedAdminClient"); } private void createTopic( @@ -90,7 +100,11 @@ private void createTopic( return; } - final List replicas = IntStream.range(0, replicationFactor) + final short resolvedReplicationFactor = replicationFactor == TopicProperties.DEFAULT_REPLICAS + ? getDefaultClusterReplication() + : replicationFactor; + + final List replicas = IntStream.range(0, resolvedReplicationFactor) .mapToObj(idx -> (Node) null) .collect(Collectors.toList()); @@ -103,7 +117,7 @@ private void createTopic( .collect(Collectors.toList()); // This is useful to validate permissions to create the topic - delegate.validateCreateTopic(topic, numPartitions, replicationFactor, configs); + delegate.validateCreateTopic(topic, numPartitions, resolvedReplicationFactor, configs); createdTopics.put(topic, new TopicDescription( topic, @@ -113,6 +127,19 @@ private void createTopic( )); } + private short getDefaultClusterReplication() { + try { + final String defaultReplication = KafkaClusterUtil.getConfig(adminClient.get()) + .get(DEFAULT_REPLICATION_PROP) + .value(); + return Short.parseShort(defaultReplication); + } catch (final KsqlServerException e) { + throw e; + } catch (final Exception e) { + throw new KsqlServerException("Could not get default replication from Kafka cluster!", e); + } + } + private boolean isTopicExists(final String topic) { if (createdTopics.containsKey(topic)) { return true; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedServiceContext.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedServiceContext.java index 6bb220140869..e4c3cde3fd92 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedServiceContext.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedServiceContext.java @@ -44,7 +44,7 @@ public static SandboxedServiceContext create(final ServiceContext serviceContext final KafkaClientSupplier kafkaClientSupplier = new SandboxedKafkaClientSupplier(); final KafkaTopicClient kafkaTopicClient = SandboxedKafkaTopicClient - .createProxy(serviceContext.getTopicClient()); + .createProxy(serviceContext.getTopicClient(), serviceContext::getAdminClient); final SchemaRegistryClient schemaRegistryClient = SandboxedSchemaRegistryClient.createProxy(serviceContext.getSchemaRegistryClient()); final ConnectClient connectClient = SandboxConnectClient.createProxy(); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaTopicClientTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaTopicClientTest.java index 868754451bbd..4380afddfaf5 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaTopicClientTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedKafkaTopicClientTest.java @@ -47,12 +47,21 @@ import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.clients.admin.DescribeConfigsResult; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.ConfigResource.Type; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.internals.KafkaFutureImpl; import org.junit.Before; import org.junit.Test; import org.junit.experimental.runners.Enclosed; @@ -94,7 +103,10 @@ public UnsupportedMethods(final TestCase testCase) { @Before public void setUp() { - sandboxedKafkaTopicClient = SandboxedKafkaTopicClient.createProxy(mock(KafkaTopicClient.class)); + sandboxedKafkaTopicClient = SandboxedKafkaTopicClient.createProxy( + mock(KafkaTopicClient.class), + () -> mock(Admin.class) + ); } @Test(expected = UnsupportedOperationException.class) @@ -108,12 +120,18 @@ public static class SupportedMethods { @Mock private KafkaTopicClient delegate; + @Mock + private Admin mockedAdmin; + private KafkaTopicClient sandboxedClient; private final Map configs = ImmutableMap.of("some config", 1); @Before public void setUp() { - sandboxedClient = SandboxedKafkaTopicClient.createProxy(delegate); + sandboxedClient = SandboxedKafkaTopicClient.createProxy( + delegate, + () -> mockedAdmin + ); } @Test @@ -193,6 +211,51 @@ public void shouldTrackCreatedTopicsDetails() { Sets.newHashSet(AclOperation.READ, AclOperation.WRITE)))); } + @Test + public void shouldCreateTopicWithBrokerDefaultReplicationFactor() { + // Given: + final short defaultReplicationFactor = 5; + final short userBrokerDefaultReplicationFactor = -1; + mockAdmin(defaultReplicationFactor); + + // When: + sandboxedClient.createTopic("some topic", 2, userBrokerDefaultReplicationFactor, configs); + + // Then: + final TopicDescription result = sandboxedClient + .describeTopic("some topic"); + + assertThat(result, is(new TopicDescription( + "some topic", + false, + topicPartitions(2, defaultReplicationFactor), + Sets.newHashSet(AclOperation.READ, AclOperation.WRITE)) + )); + } + + private void mockAdmin(final short defaultReplicationFactor) { + final Node broker = mock(Node.class); + when(broker.idString()).thenReturn("someId"); + final KafkaFutureImpl> nodes = new KafkaFutureImpl<>(); + nodes.complete(Collections.singleton(broker)); + + final DescribeClusterResult mockCluster = mock(DescribeClusterResult.class); + when(mockCluster.nodes()).thenReturn(nodes); + when(mockedAdmin.describeCluster()).thenReturn(mockCluster); + + final KafkaFutureImpl> config = new KafkaFutureImpl<>(); + config.complete(Collections.singletonMap( + new ConfigResource(Type.BROKER, "someId"), + new Config(Collections.singleton( + new ConfigEntry("default.replication.factor", String.valueOf(defaultReplicationFactor)) + ))) + ); + final DescribeConfigsResult mockConfigs = mock (DescribeConfigsResult.class); + when(mockConfigs.all()).thenReturn(config); + + when(mockedAdmin.describeConfigs(any())).thenReturn(mockConfigs); + } + @Test public void shouldThrowOnCreateIfValidateCreateTopicFails() { // Given: diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/FakeKafkaClientSupplier.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/FakeKafkaClientSupplier.java index 48fc7ca30eb9..6a50f7405fbf 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/FakeKafkaClientSupplier.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/FakeKafkaClientSupplier.java @@ -31,7 +31,7 @@ public class FakeKafkaClientSupplier implements KafkaClientSupplier { @Override public Admin getAdmin(final Map config) { - final Node node = new Node(1, "localhost", 1234); + final Node node = new Node(0, "localhost", 1234); return new MockAdminClient(Collections.singletonList(node), node); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 07455331d615..84c5a97f768b 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -80,6 +80,7 @@ import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.mockito.Mock; @@ -590,6 +591,7 @@ private void shouldRecover(final List commands) { @Before public void setUp() { topicClient.preconditionTopicExists("A"); + topicClient.preconditionTopicExists("B"); topicClient.preconditionTopicExists("command_topic"); } @@ -637,7 +639,6 @@ public void shouldRecoverReplacesWithTerminates() { shouldRecover(commands); } - @Test public void shouldRecoverInsertIntos() { server1.submitCommands(