Skip to content

Commit

Permalink
fix: SandboxKafkaTopicClient should use default replication factor if…
Browse files Browse the repository at this point in the history
… applicable (#8551)

If replication factor `-1` is used (implying to use the broker default) the SandboxKafkaTopicClient created a mocked topic with replication factor 0 instead of using the broker default.

If multiple statement are submitted as once (ie, via a file), later statements inherit the incorrect replication factor of 0 and thus fail, which fails all statements in the file.

Fix PR fixes SandboxKafkaTopicClient to use the correct default replication factor and thus allow to submit multiple dependent statement at once.
  • Loading branch information
mjsax committed Jan 6, 2022
1 parent e7b0bd0 commit 5c8c186
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.topic.TopicProperties;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.LimitedProxyBuilder;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -29,8 +31,10 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -46,8 +50,9 @@
@SuppressWarnings("unused") // Methods invoked via reflection.
final class SandboxedKafkaTopicClient {

static KafkaTopicClient createProxy(final KafkaTopicClient delegate) {
final SandboxedKafkaTopicClient sandbox = new SandboxedKafkaTopicClient(delegate);
static KafkaTopicClient createProxy(final KafkaTopicClient delegate,
final Supplier<Admin> sharedAdmin) {
final SandboxedKafkaTopicClient sandbox = new SandboxedKafkaTopicClient(delegate, sharedAdmin);

return LimitedProxyBuilder.forClass(KafkaTopicClient.class)
.forward("createTopic",
Expand All @@ -63,12 +68,17 @@ static KafkaTopicClient createProxy(final KafkaTopicClient delegate) {
.build();
}

private static final String DEFAULT_REPLICATION_PROP = "default.replication.factor";

private final KafkaTopicClient delegate;
private final Supplier<Admin> adminClient;

private final Map<String, TopicDescription> createdTopics = new HashMap<>();

private SandboxedKafkaTopicClient(final KafkaTopicClient delegate) {
private SandboxedKafkaTopicClient(final KafkaTopicClient delegate,
final Supplier<Admin> sharedAdminClient) {
this.delegate = Objects.requireNonNull(delegate, "delegate");
this.adminClient = Objects.requireNonNull(sharedAdminClient, "sharedAdminClient");
}

private void createTopic(
Expand All @@ -90,7 +100,11 @@ private void createTopic(
return;
}

final List<Node> replicas = IntStream.range(0, replicationFactor)
final short resolvedReplicationFactor = replicationFactor == TopicProperties.DEFAULT_REPLICAS
? getDefaultClusterReplication()
: replicationFactor;

final List<Node> replicas = IntStream.range(0, resolvedReplicationFactor)
.mapToObj(idx -> (Node) null)
.collect(Collectors.toList());

Expand All @@ -103,7 +117,7 @@ private void createTopic(
.collect(Collectors.toList());

// This is useful to validate permissions to create the topic
delegate.validateCreateTopic(topic, numPartitions, replicationFactor, configs);
delegate.validateCreateTopic(topic, numPartitions, resolvedReplicationFactor, configs);

createdTopics.put(topic, new TopicDescription(
topic,
Expand All @@ -113,6 +127,19 @@ private void createTopic(
));
}

private short getDefaultClusterReplication() {
try {
final String defaultReplication = KafkaClusterUtil.getConfig(adminClient.get())
.get(DEFAULT_REPLICATION_PROP)
.value();
return Short.parseShort(defaultReplication);
} catch (final KsqlServerException e) {
throw e;
} catch (final Exception e) {
throw new KsqlServerException("Could not get default replication from Kafka cluster!", e);
}
}

private boolean isTopicExists(final String topic) {
if (createdTopics.containsKey(topic)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static SandboxedServiceContext create(final ServiceContext serviceContext

final KafkaClientSupplier kafkaClientSupplier = new SandboxedKafkaClientSupplier();
final KafkaTopicClient kafkaTopicClient = SandboxedKafkaTopicClient
.createProxy(serviceContext.getTopicClient());
.createProxy(serviceContext.getTopicClient(), serviceContext::getAdminClient);
final SchemaRegistryClient schemaRegistryClient =
SandboxedSchemaRegistryClient.createProxy(serviceContext.getSchemaRegistryClient());
final ConnectClient connectClient = SandboxConnectClient.createProxy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,21 @@
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
Expand Down Expand Up @@ -94,7 +103,10 @@ public UnsupportedMethods(final TestCase<KafkaTopicClient> testCase) {

@Before
public void setUp() {
sandboxedKafkaTopicClient = SandboxedKafkaTopicClient.createProxy(mock(KafkaTopicClient.class));
sandboxedKafkaTopicClient = SandboxedKafkaTopicClient.createProxy(
mock(KafkaTopicClient.class),
() -> mock(Admin.class)
);
}

@Test(expected = UnsupportedOperationException.class)
Expand All @@ -108,12 +120,18 @@ public static class SupportedMethods {

@Mock
private KafkaTopicClient delegate;
@Mock
private Admin mockedAdmin;

private KafkaTopicClient sandboxedClient;
private final Map<String, ?> configs = ImmutableMap.of("some config", 1);

@Before
public void setUp() {
sandboxedClient = SandboxedKafkaTopicClient.createProxy(delegate);
sandboxedClient = SandboxedKafkaTopicClient.createProxy(
delegate,
() -> mockedAdmin
);
}

@Test
Expand Down Expand Up @@ -193,6 +211,51 @@ public void shouldTrackCreatedTopicsDetails() {
Sets.newHashSet(AclOperation.READ, AclOperation.WRITE))));
}

@Test
public void shouldCreateTopicWithBrokerDefaultReplicationFactor() {
// Given:
final short defaultReplicationFactor = 5;
final short userBrokerDefaultReplicationFactor = -1;
mockAdmin(defaultReplicationFactor);

// When:
sandboxedClient.createTopic("some topic", 2, userBrokerDefaultReplicationFactor, configs);

// Then:
final TopicDescription result = sandboxedClient
.describeTopic("some topic");

assertThat(result, is(new TopicDescription(
"some topic",
false,
topicPartitions(2, defaultReplicationFactor),
Sets.newHashSet(AclOperation.READ, AclOperation.WRITE))
));
}

private void mockAdmin(final short defaultReplicationFactor) {
final Node broker = mock(Node.class);
when(broker.idString()).thenReturn("someId");
final KafkaFutureImpl<Collection<Node>> nodes = new KafkaFutureImpl<>();
nodes.complete(Collections.singleton(broker));

final DescribeClusterResult mockCluster = mock(DescribeClusterResult.class);
when(mockCluster.nodes()).thenReturn(nodes);
when(mockedAdmin.describeCluster()).thenReturn(mockCluster);

final KafkaFutureImpl<Map<ConfigResource, Config>> config = new KafkaFutureImpl<>();
config.complete(Collections.singletonMap(
new ConfigResource(Type.BROKER, "someId"),
new Config(Collections.singleton(
new ConfigEntry("default.replication.factor", String.valueOf(defaultReplicationFactor))
)))
);
final DescribeConfigsResult mockConfigs = mock (DescribeConfigsResult.class);
when(mockConfigs.all()).thenReturn(config);

when(mockedAdmin.describeConfigs(any())).thenReturn(mockConfigs);
}

@Test
public void shouldThrowOnCreateIfValidateCreateTopicFails() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class FakeKafkaClientSupplier implements KafkaClientSupplier {

@Override
public Admin getAdmin(final Map<String, Object> config) {
final Node node = new Node(1, "localhost", 1234);
final Node node = new Node(0, "localhost", 1234);
return new MockAdminClient(Collections.singletonList(node), node);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mock;

Expand Down Expand Up @@ -590,6 +591,7 @@ private void shouldRecover(final List<QueuedCommand> commands) {
@Before
public void setUp() {
topicClient.preconditionTopicExists("A");
topicClient.preconditionTopicExists("B");
topicClient.preconditionTopicExists("command_topic");
}

Expand Down Expand Up @@ -637,7 +639,6 @@ public void shouldRecoverReplacesWithTerminates() {
shouldRecover(commands);
}


@Test
public void shouldRecoverInsertIntos() {
server1.submitCommands(
Expand Down

0 comments on commit 5c8c186

Please sign in to comment.