From c3376e597b51cb198d73870d15c596550c09ed21 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Mon, 13 Mar 2023 19:36:38 +0100 Subject: [PATCH] [transactions] Implement KIP-664 DescribeProducers (#78) --- .../handlers/kop/KafkaCommandDecoder.java | 10 +- .../handlers/kop/KafkaRequestHandler.java | 96 ++++++++ .../handlers/kop/storage/PartitionLog.java | 26 ++- .../kop/storage/ProducerStateManager.java | 4 + .../handlers/kop/storage/ReplicaManager.java | 11 +- .../kop/KafkaProxyRequestHandler.java | 214 +++++++++++++++++- .../pulsar/handlers/kop/TransactionTest.java | 87 +++++-- 7 files changed, 420 insertions(+), 28 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index f67f5af524..4916fca6e3 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -320,6 +320,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case DESCRIBE_GROUPS: handleDescribeGroupRequest(kafkaHeaderAndRequest, responseFuture); break; + case DESCRIBE_PRODUCERS: + handleDescribeProducersRequest(kafkaHeaderAndRequest, responseFuture); + break; case LIST_GROUPS: handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture); break; @@ -572,7 +575,12 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, handleLeaveGroupRequest(KafkaHeaderAndRequest leaveGroup, CompletableFuture response); protected abstract void - handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup, CompletableFuture response); + handleDescribeGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, + CompletableFuture response); + + protected abstract void + handleDescribeProducersRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, + CompletableFuture response); protected abstract void handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 9c6f969d87..2f5ac5c8af 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -117,6 +117,7 @@ import org.apache.kafka.common.message.DescribeClusterResponseData; import org.apache.kafka.common.message.DescribeConfigsRequestData; import org.apache.kafka.common.message.DescribeConfigsResponseData; +import org.apache.kafka.common.message.DescribeProducersResponseData; import org.apache.kafka.common.message.DescribeTransactionsResponseData; import org.apache.kafka.common.message.EndTxnRequestData; import org.apache.kafka.common.message.EndTxnResponseData; @@ -162,6 +163,8 @@ import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeGroupsRequest; +import org.apache.kafka.common.requests.DescribeProducersRequest; +import org.apache.kafka.common.requests.DescribeProducersResponse; import org.apache.kafka.common.requests.DescribeTransactionsRequest; import org.apache.kafka.common.requests.DescribeTransactionsResponse; import org.apache.kafka.common.requests.EndTxnRequest; @@ -2020,6 +2023,99 @@ protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup, )); } + @Override + protected void handleDescribeProducersRequest(KafkaHeaderAndRequest describeGroup, + CompletableFuture responseFuture) { + // https://github.com/apache/kafka/blob/79c19da68d6a93a729a07dfdd37f238246653a46/ + // core/src/main/scala/kafka/server/KafkaApis.scala#L3397 + checkArgument(describeGroup.getRequest() instanceof DescribeProducersRequest); + DescribeProducersRequest request = (DescribeProducersRequest) describeGroup.getRequest(); + Map allResponses = Maps.newConcurrentMap(); + Map errors = Maps.newConcurrentMap(); + String namespacePrefix = currentNamespacePrefix(); + final int numPartitions = request.data().topics().stream() + .mapToInt(t->t.partitionIndexes().size()) + .sum(); + Runnable completeOne = () -> { + if (errors.size() + allResponses.size() != numPartitions) { + // not enough responses + return; + } + errors.forEach((topicPartition, tpErrors) -> { + DescribeProducersResponseData.PartitionResponse topicResponse = + new DescribeProducersResponseData.PartitionResponse() + .setPartitionIndex(topicPartition.partition()) + .setErrorCode(tpErrors.code()) + .setErrorMessage(tpErrors.message()); + allResponses.put(topicPartition, topicResponse); + }); + DescribeProducersResponseData response = new DescribeProducersResponseData(); + allResponses + .entrySet() + .stream() + .collect(Collectors.groupingBy( + entry -> entry.getKey().topic(), + Collectors.mapping( + entry -> entry.getValue(), + Collectors.toList() + ) + )) + .forEach((topic, partitionResponses) -> { + DescribeProducersResponseData.TopicResponse topicResponse = + new DescribeProducersResponseData.TopicResponse() + .setName(topic) + .setPartitions(partitionResponses); + response.topics().add(topicResponse); + }); + responseFuture.complete(new DescribeProducersResponse(response)); + }; + + request.data().topics().forEach ((topicRequest) -> { + topicRequest.partitionIndexes().forEach(partition -> { + TopicPartition tp = new TopicPartition(topicRequest.name(), partition); + String fullPartitionName; + try { + fullPartitionName = KopTopic.toString(tp, namespacePrefix); + } catch (KoPTopicException e) { + log.warn("Invalid topic name: {}", tp.topic(), e); + errors.put(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION); + completeOne.run(); + return; + } + authorize(AclOperation.WRITE, Resource.of(ResourceType.TOPIC, fullPartitionName)) + .whenComplete((isAuthorized, ex) -> { + if (ex != null) { + log.error("AddPartitionsToTxn topic authorize failed, topic - {}. {}", + fullPartitionName, ex.getMessage()); + errors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED); + completeOne.run(); + return; + } + if (!isAuthorized) { + errors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED); + completeOne.run(); + return; + } + CompletableFuture topicResponse = + replicaManager.activeProducerState(tp, namespacePrefix); + topicResponse.whenComplete((response, throwable) -> { + if (throwable != null) { + log.error("DescribeProducersRequest failed, topic - {}. {}", + fullPartitionName, throwable.getMessage()); + errors.put(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION); + } else { + allResponses.put(tp, response); + } + completeOne.run(); + }); + + }); + }); + }); + + + } + @Override protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture resultFuture) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java index 0e70b96969..1e76db58e6 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java @@ -78,6 +78,7 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.message.DescribeProducersResponseData; import org.apache.kafka.common.message.FetchRequestData; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.protocol.Errors; @@ -153,7 +154,7 @@ public CompletableFuture getSchemaIds(String topic, BytesSche private volatile EntryFormatter entryFormatter; - private volatile AtomicBoolean unloaded = new AtomicBoolean(); + private final AtomicBoolean unloaded = new AtomicBoolean(); public PartitionLog(KafkaServiceConfiguration kafkaConfig, RequestStats requestStats, @@ -1187,6 +1188,29 @@ public CompletableFuture forcePurgeAbortTx() { }); } + public DescribeProducersResponseData.PartitionResponse activeProducerState() { + DescribeProducersResponseData.PartitionResponse producerState = + new DescribeProducersResponseData.PartitionResponse() + .setPartitionIndex(topicPartition.partition()) + .setErrorCode(Errors.NONE.code()) + .setActiveProducers(new ArrayList<>()); + + // this utility is only for monitoring, it is fine to access this structure directly from any thread + Map producers = producerStateManager.getProducers(); + producers.values().forEach(producerStateEntry -> { + producerState.activeProducers().add(new DescribeProducersResponseData.ProducerState() + .setProducerId(producerStateEntry.producerId()) + .setLastSequence(-1) // NOT HANDLED YET + .setProducerEpoch(producerStateEntry.producerEpoch() != null + ? producerStateEntry.producerEpoch().intValue() : -1) + .setLastTimestamp(producerStateEntry.lastTimestamp() != null + ? producerStateEntry.lastTimestamp().longValue() : -1) + .setCoordinatorEpoch(producerStateEntry.coordinatorEpoch()) + .setCurrentTxnStartOffset(producerStateEntry.currentTxnFirstOffset().orElse(-1L))); + }); + return producerState; + } + public CompletableFuture recoverTxEntries( long offset, diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java index 8c6f21ed76..c4696466a4 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java @@ -389,4 +389,8 @@ public void handleMissingDataBeforeRecovery(long minOffset, long snapshotOffset) mapEndOffset = -1; } } + + public Map getProducers() { + return producers; + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java index 4b22ed359b..4fdf115794 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java @@ -43,6 +43,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.message.DescribeProducersResponseData; import org.apache.kafka.common.message.FetchRequestData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; @@ -338,6 +339,14 @@ public CompletableFuture updatePurgeAbortedTxnsOffsets() { return logManager.updatePurgeAbortedTxnsOffsets(); } - + public CompletableFuture activeProducerState( + TopicPartition topicPartition, + String namespacePrefix) { + PartitionLog partitionLog = getPartitionLog(topicPartition, namespacePrefix); + // https://github.com/apache/kafka/blob/5514f372b3e12db1df35b257068f6bb5083111c7/ + // core/src/main/scala/kafka/server/ReplicaManager.scala#L535 + return partitionLog.awaitInitialisation() + .thenApply(log -> log.activeProducerState()); + } } diff --git a/proxy/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProxyRequestHandler.java b/proxy/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProxyRequestHandler.java index 2be8ab4cff..e043e32bf7 100644 --- a/proxy/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProxyRequestHandler.java +++ b/proxy/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProxyRequestHandler.java @@ -68,7 +68,9 @@ import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; +import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.AddOffsetsToTxnRequestData; import org.apache.kafka.common.message.AddPartitionsToTxnRequestData; @@ -82,6 +84,9 @@ import org.apache.kafka.common.message.DescribeClusterResponseData; import org.apache.kafka.common.message.DescribeConfigsRequestData; import org.apache.kafka.common.message.DescribeGroupsRequestData; +import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.DescribeProducersRequestData; +import org.apache.kafka.common.message.DescribeProducersResponseData; import org.apache.kafka.common.message.DescribeTransactionsRequestData; import org.apache.kafka.common.message.DescribeTransactionsResponseData; import org.apache.kafka.common.message.EndTxnRequestData; @@ -130,6 +135,9 @@ import org.apache.kafka.common.requests.DescribeClusterResponse; import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeGroupsRequest; +import org.apache.kafka.common.requests.DescribeGroupsResponse; +import org.apache.kafka.common.requests.DescribeProducersRequest; +import org.apache.kafka.common.requests.DescribeProducersResponse; import org.apache.kafka.common.requests.DescribeTransactionsRequest; import org.apache.kafka.common.requests.DescribeTransactionsResponse; import org.apache.kafka.common.requests.EndTxnRequest; @@ -1111,14 +1119,206 @@ protected void handleLeaveGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndReque null); } + + private void sendRequestToAllTopicOwners( + KafkaHeaderAndRequest kafkaHeaderAndRequest, + CompletableFuture resultFuture, + Function> topicsExtractor, + Class requestClass, + Class requestDataClass, + BiFunction, K> cloneRequest, + Function, R> responseCollector, + BiFunction customErrorBuilder + ) { + resultFuture.whenComplete((response, ex) -> { + // in any case we need to close the request and release the buffer + // the original request is never sent on the wire + kafkaHeaderAndRequest.close(); + }); + BiFunction errorBuilder; + if (customErrorBuilder == null) { + errorBuilder = (K request, Throwable t) -> { + while (t instanceof CompletionException && t.getCause() != null) { + t = t.getCause(); + } + if (t instanceof IOException + || t.getCause() instanceof IOException) { + t = new NotLeaderOrFollowerException("Network error: " + t, t); + } + log.debug("Unexpected error", t); + return (R) request.getErrorResponse(t); + }; + } else { + errorBuilder = customErrorBuilder; + } + try { + checkArgument(requestClass.isInstance(kafkaHeaderAndRequest.getRequest())); + K request = (K) kafkaHeaderAndRequest.getRequest(); + checkArgument(requestDataClass.isInstance(request.data())); + + Map> keysByBroker = new ConcurrentHashMap<>(); + List> findBrokers = new ArrayList<>(); + List keys = topicsExtractor.apply((V) request.data()); + String namespacePrefix = currentNamespacePrefix(); + final String metadataNamespace = kafkaConfig.getKafkaMetadataNamespace(); + for (TopicPartition topicPartition : keys) { + final String fullPartitionName = KopTopic.toString(topicPartition, namespacePrefix); + // check KOP inner topic + if (KopTopic.isInternalTopic(metadataNamespace, fullPartitionName)) { + resultFuture.complete(errorBuilder.apply(request, new InvalidTopicException( + "Topic " + fullPartitionName + " is not allowed to be accessed"))); + return; + } + findBrokers.add(findBroker(TopicName.get(fullPartitionName), true) + .thenApply(m -> { + keysByBroker.compute(m.node, (k, currentList) -> { + if (currentList == null) { + currentList = new CopyOnWriteArrayList<>(); + } + currentList.add(topicPartition); + return currentList; + }); + return m.node; + })); + } + + + CompletableFuture> cc = FutureUtil.waitForAll(findBrokers) + .thenApply(__ -> { + Set distinct = new HashSet<>(); + for (CompletableFuture f : findBrokers) { + distinct.add(f.join()); + } + return distinct; + }); + + + CompletableFuture finalResult = cc.thenCompose(coordinators -> { + List> futures = new CopyOnWriteArrayList<>(); + for (Node node : coordinators) { + List keysForBroker = keysByBroker.get(node); + CompletableFuture responseFromBroker = new CompletableFuture<>(); + futures.add(responseFromBroker); + KafkaHeaderAndRequest requestWithNewHeader = + executeCloneRequest(kafkaHeaderAndRequest, cloneRequest, keysForBroker); + grabConnectionToBroker(node.host(), node.port()) + .forwardRequest(requestWithNewHeader) + .thenAccept(serverResponse -> { + if (!isNoisyRequest(request)) { + log.info("Response {} from broker {}:{} errors {}.", serverResponse, + node.host(), node.port(), + serverResponse.errorCounts()); + } + if (serverResponse.errorCounts() != null) { + for (Errors error : serverResponse.errorCounts().keySet()) { + if (error == Errors.NOT_COORDINATOR + || error == Errors.NOT_CONTROLLER + || error == Errors.COORDINATOR_NOT_AVAILABLE + || error == Errors.NOT_LEADER_OR_FOLLOWER) { + forgetMetadataForFailedBroker(node.host(), + node.port()); + } + } + } + responseFromBroker.complete((R) serverResponse); + }).exceptionally(err -> { + log.error("Error sending coordinator request to {} :{}", + node, err); + responseFromBroker.complete(errorBuilder.apply(request, err)); + return null; + }); + } + return FutureUtil.waitForAll(futures).thenApply(responses -> { + log.info("Got responses from all brokers {}", responses); + List responseList = new ArrayList<>(); + for (CompletableFuture response : futures) { + responseList.add(response.join()); + } + return responseCollector.apply(responseList); + }); + }); + + finalResult.whenComplete((r, ex) -> { + if (ex != null) { + log.error("Error sending request to all coordinators", ex); + resultFuture.complete(errorBuilder.apply(request, ex)); + } else { + resultFuture.complete(r); + } + }); + + + } catch (RuntimeException err) { + log.error("Runtime error " + err, err); + resultFuture.completeExceptionally(err); + } + } + @Override protected void handleDescribeGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, - CompletableFuture resultFuture) { - // forward to the coordinator of the first group - handleRequestWithCoordinator(kafkaHeaderAndRequest, resultFuture, FindCoordinatorRequest.CoordinatorType.GROUP, + CompletableFuture response) { + this. + sendRequestToAllCoordinators(kafkaHeaderAndRequest, response, + FindCoordinatorRequest.CoordinatorType.GROUP, + (DescribeGroupsRequestData data) -> data.groups(), DescribeGroupsRequest.class, DescribeGroupsRequestData.class, - (DescribeGroupsRequestData r) -> r.groups().get(0), + (DescribeGroupsRequest describeGroupsRequest, List keys) -> { + DescribeGroupsRequestData data = new DescribeGroupsRequestData() + .setGroups(keys); + return new DescribeGroupsRequest.Builder(data).build(describeGroupsRequest.version()); + }, + (allResponses) -> { + DescribeGroupsResponseData responseData = new DescribeGroupsResponseData(); + responseData.setGroups(allResponses + .stream() + .flatMap(d->d.data().groups().stream()) + .collect(Collectors.toList())); + return new DescribeGroupsResponse(responseData); + }, + null); + } + + @Override + protected void handleDescribeProducersRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, + CompletableFuture response) { + this. + sendRequestToAllTopicOwners(kafkaHeaderAndRequest, response, + (DescribeProducersRequestData data) -> { + List topics = new ArrayList<>(); + data.topics().forEach(t-> { + t.partitionIndexes().forEach(index -> { + topics.add(new TopicPartition(t.name(), index)); + }); + }); + return topics; + }, + DescribeProducersRequest.class, + DescribeProducersRequestData.class, + (DescribeProducersRequest describeProducersRequest, List keys) -> { + DescribeProducersRequestData data = new DescribeProducersRequestData() + .setTopics(keys.stream() + .collect(Collectors.groupingBy(TopicPartition::topic)) + .entrySet() + .stream() + .map(e -> new DescribeProducersRequestData.TopicRequest() + .setName(e.getKey()) + .setPartitionIndexes(e.getValue().stream() + .map(TopicPartition::partition) + .collect(Collectors.toList()))) + .collect(Collectors.toList())); + return new DescribeProducersRequest.Builder(data).build(describeProducersRequest.version()); + }, + (allResponses) -> { + DescribeProducersResponseData responseData = new DescribeProducersResponseData(); + responseData.setTopics(allResponses + .stream() + .flatMap(d->d.data().topics().stream()) + .collect(Collectors.toList())); + return new DescribeProducersResponse(responseData); + }, null); } @@ -2298,9 +2498,9 @@ R extends AbstractResponse> void sendRequestToAllCoordinators( } } - private KafkaHeaderAndRequest executeCloneRequest( - KafkaHeaderAndRequest kafkaHeaderAndRequest, BiFunction, K> cloneRequest, - List keys) { + private KafkaHeaderAndRequest executeCloneRequest( + KafkaHeaderAndRequest kafkaHeaderAndRequest, BiFunction, K> cloneRequest, + List keys) { int dummyCorrelationId = getDummyCorrelationId(); RequestHeader header = new RequestHeader( kafkaHeaderAndRequest.getHeader().apiKey(), diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java index 7a6dae97dd..d9f8d0ca72 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java @@ -49,13 +49,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import java.util.function.Function; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeProducersResult; import org.apache.kafka.clients.admin.ListTransactionsOptions; import org.apache.kafka.clients.admin.ListTransactionsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.ProducerState; import org.apache.kafka.clients.admin.TransactionDescription; import org.apache.kafka.clients.admin.TransactionListing; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -1528,47 +1531,63 @@ public void testListAndDescribeTransactions() throws Exception { producer.initTransactions(); producer.beginTransaction(); assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.EMPTY); + org.apache.kafka.clients.admin.TransactionState.EMPTY, (stateOnBroker, stateOnCoodinator) -> { + assertNull(stateOnBroker); + }); producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); producer.flush(); - ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); - listTransactionsResult.all().get().forEach(t -> { - log.info("Found transactionalId: {} {} {}", - t.transactionalId(), - t.producerId(), - t.state()); - }); + // the transaction is in ONGOING state assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.ONGOING); + org.apache.kafka.clients.admin.TransactionState.ONGOING, + (stateOnBroker, stateOnCoodinator) -> {}); + + // wait for the brokers to update the state Awaitility.await().untilAsserted(() -> { - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.ONGOING); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.ONGOING, + (stateOnBroker, stateOnCoodinator) -> { + // THESE ASSERTIONS ARE NOT VALID YET + //log.info("stateOnBroker: {}", stateOnBroker); + //log.info("stateOnCoodinator: {}", stateOnCoodinator); + // assertTrue(stateOnBroker.lastTimestamp() + // >= stateOnCoodinator.transactionStartTimeMs().orElseThrow()); + }); }); producer.commitTransaction(); Awaitility.await().untilAsserted(() -> { - assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); - }); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT, + (stateOnBroker, stateOnCoodinator) -> { + }); + }); producer.beginTransaction(); assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); + org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT, + (stateOnBroker, stateOnCoodinator) -> { + }); producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); producer.flush(); producer.abortTransaction(); Awaitility.await().untilAsserted(() -> { assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); + org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT, + (stateOnBroker, stateOnCoodinator) -> { + }); }); producer.close(); assertTransactionState(kafkaAdmin, transactionalId, - org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); + org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT, + (stateOnBroker, stateOnCoodinator) -> { + }); } private static void assertTransactionState(AdminClient kafkaAdmin, String transactionalId, - org.apache.kafka.clients.admin.TransactionState transactionState) + org.apache.kafka.clients.admin.TransactionState transactionState, + BiConsumer + producerStateValidator) throws Exception { ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); Collection transactionListings = listTransactionsResult.all().get(); @@ -1643,12 +1662,44 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa assertEquals(0, transactionDescription.topicPartitions().size()); break; case ONGOING: + assertTrue(transactionDescription.transactionStartTimeMs().orElseThrow() > 0); assertEquals(1, transactionDescription.topicPartitions().size()); break; default: fail("unhandled " + transactionState); } + DescribeProducersResult producers = kafkaAdmin.describeProducers(transactionDescription.topicPartitions()); + Map topicPartitionPartitionProducerStateMap = + producers.all().get(); + log.debug("topicPartitionPartitionProducerStateMap {}", topicPartitionPartitionProducerStateMap); + + + switch (transactionState) { + case EMPTY: + case COMPLETE_COMMIT: + case COMPLETE_ABORT: + producerStateValidator.accept(null, transactionDescription); + assertEquals(0, topicPartitionPartitionProducerStateMap.size()); + break; + case ONGOING: + assertEquals(1, topicPartitionPartitionProducerStateMap.size()); + TopicPartition tp = transactionDescription.topicPartitions().iterator().next(); + DescribeProducersResult.PartitionProducerState partitionProducerState = + topicPartitionPartitionProducerStateMap.get(tp); + List producerStates = partitionProducerState.activeProducers(); + assertEquals(1, producerStates.size()); + ProducerState producerState = producerStates.get(0); + assertEquals(producerState.producerId(), transactionDescription.producerId()); + producerStateValidator.accept(producerState, transactionDescription); + + + break; + default: + fail("unhandled " + transactionState); + } + + } /**