Skip to content

Commit

Permalink
fix: switch AdminClient to be sandbox proxy (#3351)
Browse files Browse the repository at this point in the history
Now that Kafka has an `Admin` interface we can switch our `SandboxedAdminClient` from extending the abstract `AdminClient` to being a proxy.  This decouples us from most upstream changes to the API.
  • Loading branch information
big-andy-coates authored Sep 13, 2019
1 parent 8c8da54 commit 6747d5c
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,78 +15,10 @@

package io.confluent.ksql.services;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsOptions;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.AlterReplicaLogDirsOptions;
import org.apache.kafka.clients.admin.AlterReplicaLogDirsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.CreateAclsOptions;
import org.apache.kafka.clients.admin.CreateAclsResult;
import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
import org.apache.kafka.clients.admin.CreatePartitionsOptions;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteAclsOptions;
import org.apache.kafka.clients.admin.DeleteAclsResult;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.clients.admin.DeleteRecordsOptions;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeAclsOptions;
import org.apache.kafka.clients.admin.DescribeAclsResult;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
import org.apache.kafka.clients.admin.DescribeLogDirsOptions;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsOptions;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ElectLeadersOptions;
import org.apache.kafka.clients.admin.ElectLeadersResult;
import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.admin.ListPartitionReassignmentsOptions;
import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
import static io.confluent.ksql.util.LimitedProxyBuilder.anyParams;

import io.confluent.ksql.util.LimitedProxyBuilder;
import org.apache.kafka.clients.admin.Admin;

/**
* An admin client to use while trying out operations.
Expand All @@ -96,231 +28,14 @@
* <p>Most operations result in a {@code UnsupportedOperationException} being thrown as they are
* not called.
*/
class SandboxedAdminClient extends AdminClient {

SandboxedAdminClient() {
}

@Override
public void close(final Duration duration) {
// No op
}

@Override
public CreateTopicsResult createTopics(
final Collection<NewTopic> newTopics,
final CreateTopicsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DeleteTopicsResult deleteTopics(
final Collection<String> topics,
final DeleteTopicsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public ListTopicsResult listTopics(final ListTopicsOptions options) {
throw new UnsupportedOperationException();
}

@Override
public DescribeTopicsResult describeTopics(
final Collection<String> topics,
final DescribeTopicsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DescribeClusterResult describeCluster(final DescribeClusterOptions options) {
throw new UnsupportedOperationException();
}

@Override
public DescribeAclsResult describeAcls(
final AclBindingFilter filter,
final DescribeAclsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public CreateAclsResult createAcls(
final Collection<AclBinding> acls,
final CreateAclsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DeleteAclsResult deleteAcls(
final Collection<AclBindingFilter> acls,
final DeleteAclsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DescribeConfigsResult describeConfigs(
final Collection<ConfigResource> configs,
final DescribeConfigsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public AlterConfigsResult incrementalAlterConfigs(
final Map<ConfigResource, Collection<AlterConfigOp>> configs,
final AlterConfigsOptions options) {
throw new UnsupportedOperationException();
}

@SuppressWarnings({"deprecation", "RedundantSuppression"})
@Override
public AlterConfigsResult alterConfigs(
final Map<ConfigResource, Config> configs,
final AlterConfigsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public AlterReplicaLogDirsResult alterReplicaLogDirs(
final Map<TopicPartitionReplica, String> replicaAssignment,
final AlterReplicaLogDirsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DescribeLogDirsResult describeLogDirs(
final Collection<Integer> brokers,
final DescribeLogDirsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DescribeReplicaLogDirsResult describeReplicaLogDirs(
final Collection<TopicPartitionReplica> replicas,
final DescribeReplicaLogDirsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public CreatePartitionsResult createPartitions(
final Map<String, NewPartitions> newPartitions,
final CreatePartitionsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DeleteRecordsResult deleteRecords(
final Map<TopicPartition, RecordsToDelete> recordsToDelete,
final DeleteRecordsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public CreateDelegationTokenResult createDelegationToken(
final CreateDelegationTokenOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public RenewDelegationTokenResult renewDelegationToken(
final byte[] hmac,
final RenewDelegationTokenOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public ExpireDelegationTokenResult expireDelegationToken(
final byte[] hmac,
final ExpireDelegationTokenOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DescribeDelegationTokenResult describeDelegationToken(
final DescribeDelegationTokenOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DescribeConsumerGroupsResult describeConsumerGroups(
final Collection<String> groupIds,
final DescribeConsumerGroupsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public ListConsumerGroupsResult listConsumerGroups(final ListConsumerGroupsOptions options) {
throw new UnsupportedOperationException();
}

@Override
public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(
final String groupId,
final ListConsumerGroupOffsetsOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public DeleteConsumerGroupsResult deleteConsumerGroups(
final Collection<String> groupIds,
final DeleteConsumerGroupsOptions options
) {
throw new UnsupportedOperationException();
}

@SuppressWarnings("deprecation")
@Override
public org.apache.kafka.clients.admin.ElectPreferredLeadersResult electPreferredLeaders(
final Collection<TopicPartition> partitions,
final org.apache.kafka.clients.admin.ElectPreferredLeadersOptions options
) {
throw new UnsupportedOperationException();
}

@Override
public ElectLeadersResult electLeaders(
final ElectionType electionType,
final Set<TopicPartition> set,
final ElectLeadersOptions electLeadersOptions
) {
throw new UnsupportedOperationException();
}

@Override
public AlterPartitionReassignmentsResult alterPartitionReassignments(
final Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
final AlterPartitionReassignmentsOptions options) {
throw new UnsupportedOperationException();
}
final class SandboxedAdminClient {

@Override
public ListPartitionReassignmentsResult listPartitionReassignments(
final Optional<Set<TopicPartition>> partitions,
final ListPartitionReassignmentsOptions options) {
throw new UnsupportedOperationException();
static Admin createProxy() {
return LimitedProxyBuilder.forClass(Admin.class)
.swallow("close", anyParams())
.build();
}

@Override
public Map<MetricName, ? extends Metric> metrics() {
throw new UnsupportedOperationException();
private SandboxedAdminClient() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SandboxedKafkaClientSupplier implements KafkaClientSupplier {

@Override
public Admin getAdmin(final Map<String, Object> config) {
return new SandboxedAdminClient();
return SandboxedAdminClient.createProxy();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ElectLeadersOptions;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -51,16 +50,16 @@ public static Collection<TestCase<Admin>> getMethodsToTest() {
.build();
}

private final TestCase<AdminClient> testCase;
private AdminClient sandboxedAdminClient;
private final TestCase<Admin> testCase;
private Admin sandboxedAdminClient;

public UnsupportedMethods(final TestCase<AdminClient> testCase) {
public UnsupportedMethods(final TestCase<Admin> testCase) {
this.testCase = Objects.requireNonNull(testCase, "testCase");
}

@Before
public void setUp() {
sandboxedAdminClient = new SandboxedAdminClient();
sandboxedAdminClient = SandboxedAdminClient.createProxy();
}

@Test(expected = UnsupportedOperationException.class)
Expand All @@ -71,11 +70,11 @@ public void shouldThrowOnUnsupportedOperation() throws Throwable {

public static class SupportedMethods {

private AdminClient sandboxedAdminClient;
private Admin sandboxedAdminClient;

@Before
public void setUp() {
sandboxedAdminClient = new SandboxedAdminClient();
sandboxedAdminClient = SandboxedAdminClient.createProxy();
}

@SuppressWarnings("deprecation")
Expand Down
Loading

0 comments on commit 6747d5c

Please sign in to comment.