Skip to content

Commit

Permalink
[transactions] Implement KIP-664 DescribeProducers (streamnative#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Mar 13, 2023
1 parent 1f2fe99 commit c3376e5
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
case DESCRIBE_GROUPS:
handleDescribeGroupRequest(kafkaHeaderAndRequest, responseFuture);
break;
case DESCRIBE_PRODUCERS:
handleDescribeProducersRequest(kafkaHeaderAndRequest, responseFuture);
break;
case LIST_GROUPS:
handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture);
break;
Expand Down Expand Up @@ -572,7 +575,12 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
handleLeaveGroupRequest(KafkaHeaderAndRequest leaveGroup, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup, CompletableFuture<AbstractResponse> response);
handleDescribeGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> response);

protected abstract void
handleDescribeProducersRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> response);

protected abstract void
handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.message.EndTxnRequestData;
import org.apache.kafka.common.message.EndTxnResponseData;
Expand Down Expand Up @@ -162,6 +163,8 @@
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeProducersRequest;
import org.apache.kafka.common.requests.DescribeProducersResponse;
import org.apache.kafka.common.requests.DescribeTransactionsRequest;
import org.apache.kafka.common.requests.DescribeTransactionsResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
Expand Down Expand Up @@ -2020,6 +2023,99 @@ protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup,
));
}

@Override
protected void handleDescribeProducersRequest(KafkaHeaderAndRequest describeGroup,
CompletableFuture<AbstractResponse> responseFuture) {
// https://github.com/apache/kafka/blob/79c19da68d6a93a729a07dfdd37f238246653a46/
// core/src/main/scala/kafka/server/KafkaApis.scala#L3397
checkArgument(describeGroup.getRequest() instanceof DescribeProducersRequest);
DescribeProducersRequest request = (DescribeProducersRequest) describeGroup.getRequest();
Map<TopicPartition, DescribeProducersResponseData.PartitionResponse> allResponses = Maps.newConcurrentMap();
Map<TopicPartition, Errors> errors = Maps.newConcurrentMap();
String namespacePrefix = currentNamespacePrefix();
final int numPartitions = request.data().topics().stream()
.mapToInt(t->t.partitionIndexes().size())
.sum();
Runnable completeOne = () -> {
if (errors.size() + allResponses.size() != numPartitions) {
// not enough responses
return;
}
errors.forEach((topicPartition, tpErrors) -> {
DescribeProducersResponseData.PartitionResponse topicResponse =
new DescribeProducersResponseData.PartitionResponse()
.setPartitionIndex(topicPartition.partition())
.setErrorCode(tpErrors.code())
.setErrorMessage(tpErrors.message());
allResponses.put(topicPartition, topicResponse);
});
DescribeProducersResponseData response = new DescribeProducersResponseData();
allResponses
.entrySet()
.stream()
.collect(Collectors.groupingBy(
entry -> entry.getKey().topic(),
Collectors.mapping(
entry -> entry.getValue(),
Collectors.toList()
)
))
.forEach((topic, partitionResponses) -> {
DescribeProducersResponseData.TopicResponse topicResponse =
new DescribeProducersResponseData.TopicResponse()
.setName(topic)
.setPartitions(partitionResponses);
response.topics().add(topicResponse);
});
responseFuture.complete(new DescribeProducersResponse(response));
};

request.data().topics().forEach ((topicRequest) -> {
topicRequest.partitionIndexes().forEach(partition -> {
TopicPartition tp = new TopicPartition(topicRequest.name(), partition);
String fullPartitionName;
try {
fullPartitionName = KopTopic.toString(tp, namespacePrefix);
} catch (KoPTopicException e) {
log.warn("Invalid topic name: {}", tp.topic(), e);
errors.put(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION);
completeOne.run();
return;
}
authorize(AclOperation.WRITE, Resource.of(ResourceType.TOPIC, fullPartitionName))
.whenComplete((isAuthorized, ex) -> {
if (ex != null) {
log.error("AddPartitionsToTxn topic authorize failed, topic - {}. {}",
fullPartitionName, ex.getMessage());
errors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED);
completeOne.run();
return;
}
if (!isAuthorized) {
errors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED);
completeOne.run();
return;
}
CompletableFuture<DescribeProducersResponseData.PartitionResponse> topicResponse =
replicaManager.activeProducerState(tp, namespacePrefix);
topicResponse.whenComplete((response, throwable) -> {
if (throwable != null) {
log.error("DescribeProducersRequest failed, topic - {}. {}",
fullPartitionName, throwable.getMessage());
errors.put(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION);
} else {
allResponses.put(tp, response);
}
completeOne.run();
});

});
});
});


}

@Override
protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups,
CompletableFuture<AbstractResponse> resultFuture) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.Errors;
Expand Down Expand Up @@ -153,7 +154,7 @@ public CompletableFuture<KeyValueSchemaIds> getSchemaIds(String topic, BytesSche

private volatile EntryFormatter entryFormatter;

private volatile AtomicBoolean unloaded = new AtomicBoolean();
private final AtomicBoolean unloaded = new AtomicBoolean();

public PartitionLog(KafkaServiceConfiguration kafkaConfig,
RequestStats requestStats,
Expand Down Expand Up @@ -1187,6 +1188,29 @@ public CompletableFuture<Long> forcePurgeAbortTx() {
});
}

public DescribeProducersResponseData.PartitionResponse activeProducerState() {
DescribeProducersResponseData.PartitionResponse producerState =
new DescribeProducersResponseData.PartitionResponse()
.setPartitionIndex(topicPartition.partition())
.setErrorCode(Errors.NONE.code())
.setActiveProducers(new ArrayList<>());

// this utility is only for monitoring, it is fine to access this structure directly from any thread
Map<Long, ProducerStateEntry> producers = producerStateManager.getProducers();
producers.values().forEach(producerStateEntry -> {
producerState.activeProducers().add(new DescribeProducersResponseData.ProducerState()
.setProducerId(producerStateEntry.producerId())
.setLastSequence(-1) // NOT HANDLED YET
.setProducerEpoch(producerStateEntry.producerEpoch() != null
? producerStateEntry.producerEpoch().intValue() : -1)
.setLastTimestamp(producerStateEntry.lastTimestamp() != null
? producerStateEntry.lastTimestamp().longValue() : -1)
.setCoordinatorEpoch(producerStateEntry.coordinatorEpoch())
.setCurrentTxnStartOffset(producerStateEntry.currentTxnFirstOffset().orElse(-1L)));
});
return producerState;
}


public CompletableFuture<Long> recoverTxEntries(
long offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,4 +389,8 @@ public void handleMissingDataBeforeRecovery(long minOffset, long snapshotOffset)
mapEndOffset = -1;
}
}

public Map<Long, ProducerStateEntry> getProducers() {
return producers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
Expand Down Expand Up @@ -338,6 +339,14 @@ public CompletableFuture<?> updatePurgeAbortedTxnsOffsets() {
return logManager.updatePurgeAbortedTxnsOffsets();
}


public CompletableFuture<DescribeProducersResponseData.PartitionResponse> activeProducerState(
TopicPartition topicPartition,
String namespacePrefix) {
PartitionLog partitionLog = getPartitionLog(topicPartition, namespacePrefix);
// https://github.com/apache/kafka/blob/5514f372b3e12db1df35b257068f6bb5083111c7/
// core/src/main/scala/kafka/server/ReplicaManager.scala#L535
return partitionLog.awaitInitialisation()
.thenApply(log -> log.activeProducerState());
}

}
Loading

0 comments on commit c3376e5

Please sign in to comment.