Skip to content

Commit

Permalink
replace deprecated methods
Browse files Browse the repository at this point in the history
  • Loading branch information
wayneguow committed Jun 13, 2024
1 parent ea2bca7 commit 074bac9
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ private[kafka010] sealed trait ConsumerStrategy extends Logging {
.build()

protected def retrieveAllPartitions(admin: Admin, topics: Set[String]): Set[TopicPartition] = {
admin.describeTopics(topics.asJava).all().get().asScala.filterNot(_._2.isInternal).flatMap {
case (topic, topicDescription) =>
topicDescription.partitions().asScala.map { topicPartitionInfo =>
val partition = topicPartitionInfo.partition()
logDebug(s"Partition found: $topic:$partition")
new TopicPartition(topic, partition)
}
}.toSet
admin.describeTopics(topics.asJava).allTopicNames().get().asScala
.filterNot(_._2.isInternal)
.flatMap {
case (topic, topicDescription) =>
topicDescription.partitions().asScala.map { topicPartitionInfo =>
val partition = topicPartitionInfo.partition()
logDebug(s"Partition found: $topic:$partition")
new TopicPartition(topic, partition)
}
}.toSet
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ class KafkaTestUtils(
}

private def getOffsets(topics: Set[String], offsetSpec: OffsetSpec): Map[TopicPartition, Long] = {
val listOffsetsParams = adminClient.describeTopics(topics.asJava).all().get().asScala
val listOffsetsParams = adminClient.describeTopics(topics.asJava).allTopicNames().get().asScala
.flatMap { topicDescription =>
topicDescription._2.partitions().asScala.map { topicPartitionInfo =>
new TopicPartition(topicDescription._1, topicPartitionInfo.partition())
Expand Down

0 comments on commit 074bac9

Please sign in to comment.