From 397cd27e3b0b59b8d151a4c697fe98195ae90f57 Mon Sep 17 00:00:00 2001 From: Justin Chase Date: Sun, 12 Mar 2023 17:12:39 -0500 Subject: [PATCH] fix(topicdata): Use the partition leader from partition info (#1388) relate to #657 #1364 --- src/main/java/org/akhq/models/Partition.java | 28 +++++++++++++------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/akhq/models/Partition.java b/src/main/java/org/akhq/models/Partition.java index 31036ab8a..97b237da2 100644 --- a/src/main/java/org/akhq/models/Partition.java +++ b/src/main/java/org/akhq/models/Partition.java @@ -13,6 +13,7 @@ @Getter @NoArgsConstructor public class Partition { + private Node.Partition leader; private int id; private String topic; private List nodes; @@ -27,27 +28,36 @@ public Partition(String topic, TopicPartitionInfo partitionInfo, List lo this.firstOffset = offsets.getFirstOffset(); this.lastOffset = offsets.getLastOffset(); this.nodes = new ArrayList<>(); - for (org.apache.kafka.common.Node replica : partitionInfo.replicas()) { - nodes.add(new Node.Partition( + Node.Partition partition = new Node.Partition( replica, partitionInfo.leader().id() == replica.id(), partitionInfo.isr().stream().anyMatch(node -> node.id() == replica.id()) - )); + ); + + this.nodes.add(partition); + if (partition.isLeader()) { + this.leader = partition; + } + } + + if (this.leader == null) { + org.apache.kafka.common.Node leader = partitionInfo.leader(); + this.leader = new Node.Partition( + leader, + true, + partitionInfo.isr().stream().anyMatch(node -> node.id() == leader.id()) + ); } } public Node.Partition getLeader() { - return nodes - .stream() - .filter(Node.Partition::isLeader) - .findFirst() - .orElseThrow(() -> new NoSuchElementException("Leader not found")); + return this.leader; } public long getLogDirSize() { return this.getLogDir().stream() - .filter(logDir -> logDir.getBrokerId() == this.getLeader().getId()) + .filter(logDir -> this.leader != null && logDir.getBrokerId() == this.leader.getId()) .map(LogDir::getSize) .reduce(0L, Long::sum); }