From 99a16641add6dbb302bbc71452a29d614db0bf3b Mon Sep 17 00:00:00 2001 From: Rohan Date: Mon, 19 Oct 2020 21:30:28 -0700 Subject: [PATCH] fix: check for nested UnspportedVersionException during auth op check --- .../ksql/services/KafkaClusterUtil.java | 7 ++- ...KsqlAuthorizationValidatorFactoryTest.java | 58 +++++++++++++++---- 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaClusterUtil.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaClusterUtil.java index 667fb9d08c2a..fe4641117c64 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaClusterUtil.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/KafkaClusterUtil.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.DescribeClusterOptions; @@ -51,9 +52,11 @@ public static boolean isAuthorizedOperationsSupported(final Admin adminClient) { ); return authorizedOperations.authorizedOperations().get() != null; - } catch (final UnsupportedVersionException e) { - return false; } catch (final Exception e) { + if (ExceptionUtils.indexOfType(e, UnsupportedVersionException.class) != -1) { + LOG.info("Received nested unsupported version error testing authorized operations api", e); + return false; + } throw new KsqlServerException("Could not get Kafka authorized operations!", e); } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactoryTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactoryTest.java index e55137bba93e..280d4a97073c 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactoryTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/security/KsqlAuthorizationValidatorFactoryTest.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; @@ -181,19 +182,32 @@ public void shouldReturnEmptyValidatorIfAuthorizedOperationsReturnNull() { @Test public void shouldReturnEmptyValidatorIfKafkaBrokerVersionTooLowButAuthorizerClassConfigIsSet() { // Given: - final Collection nodes = Collections.singletonList(node); - final DescribeClusterResult describeClusterResult = mock(DescribeClusterResult.class); - when(describeClusterResult.nodes()).thenReturn(KafkaFuture.completedFuture(nodes)); - when(adminClient.describeCluster()).thenReturn(describeClusterResult); + givenSingleNode(); + givenAuthorizerClass("a-class"); + when(adminClient.describeCluster(any())).thenThrow(new UnsupportedVersionException("too old")); - final DescribeConfigsResult describeConfigsResult = describeBrokerResult( - Collections.singletonList( - new ConfigEntry(KAFKA_AUTHORIZER_CLASS_NAME, "a-class") - ) + // When: + final Optional validator = KsqlAuthorizationValidatorFactory.create( + ksqlConfig, + serviceContext ); - when(adminClient.describeConfigs(describeBrokerRequest())) - .thenReturn(describeConfigsResult); - when(adminClient.describeCluster(any())).thenThrow(new UnsupportedVersionException("too old")); + + // Then + assertThat(validator, is(Optional.empty())); + } + + @Test + public void shouldReturnEmptyValidatorIfKafkaBrokerVersionTooLowAndExceptionWrapped() + throws InterruptedException, ExecutionException { + // Given: + givenSingleNode(); + givenAuthorizerClass("a-class"); + final KafkaFuture> authorized = mockAuthorizedOperationsFuture(); + final DescribeClusterResult result = mock(DescribeClusterResult.class); + when(adminClient.describeCluster(any())).thenReturn(result); + when(result.authorizedOperations()).thenReturn(authorized); + when(authorized.get()) + .thenThrow(new ExecutionException(new UnsupportedVersionException("too old"))); // When: final Optional validator = KsqlAuthorizationValidatorFactory.create( @@ -242,4 +256,26 @@ private DescribeConfigsResult describeBrokerResult(final List broke when(describeConfigsResult.all()).thenReturn(KafkaFuture.completedFuture(config)); return describeConfigsResult; } + + private void givenSingleNode() { + final Collection nodes = Collections.singletonList(node); + final DescribeClusterResult describeClusterResult = mock(DescribeClusterResult.class); + when(describeClusterResult.nodes()).thenReturn(KafkaFuture.completedFuture(nodes)); + when(adminClient.describeCluster()).thenReturn(describeClusterResult); + } + + private void givenAuthorizerClass(final String name) { + final DescribeConfigsResult describeConfigsResult = describeBrokerResult( + Collections.singletonList( + new ConfigEntry(KAFKA_AUTHORIZER_CLASS_NAME, name) + ) + ); + when(adminClient.describeConfigs(describeBrokerRequest())) + .thenReturn(describeConfigsResult); + } + + @SuppressWarnings("unchecked") + private KafkaFuture> mockAuthorizedOperationsFuture() { + return mock(KafkaFuture.class); + } }