Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 396 ListOffsets #1029

Merged
merged 12 commits into from
Oct 23, 2023
1 change: 1 addition & 0 deletions examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ admin_incremental_alter_configs/admin_incremental_alter_configs
admin_describe_consumer_groups/admin_describe_consumer_groups
admin_list_consumer_groups/admin_list_consumer_groups
admin_list_consumer_group_offsets/admin_list_consumer_group_offsets
admin_list_offsets/admin_list_offsets
admin_describe_user_scram_credentials/admin_describe_user_scram_credentials
admin_alter_user_scram_credentials/admin_alter_user_scram_credentials
avro_generic_consumer_example/avro_generic_consumer_example
Expand Down
30 changes: 16 additions & 14 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ Examples
[admin_alter_consumer_group_offsets](admin_alter_consumer_group_offsets) - Alter Consumer Group Offsets

[admin_create_acls](admin_create_acls) - Create Access Control Lists

[admin_create_topic](admin_create_topic) - Create a topic

[admin_delete_acls](admin_delete_acls) - Delete Access Control Lists using different filters

[admin_delete_topics](admin_delete_topics) - Delete some topics

[admin_delete_consumer_groups](admin_delete_consumer_groups) - Delete consumer groups

[admin_delete_topics](admin_delete_topics) - Delete topics
Expand All @@ -28,18 +28,20 @@ Examples

[admin_list_consumer_group_offsets](admin_list_consumer_group_offsets) - List consumer group offsets

[admin_list_offsets](admin_list_offsets) - List partition offsets

[admin_list_consumer_groups](admin_list_consumer_groups) - List consumer groups

[avro_generic_consumer_example](avro_generic_consumer_example) - consumer with Schema Registry and Avro Generic Deserializer

[avro_generic_producer_example](avro_generic_producer_example) - producer with Schema Registry and Avro Generic Serializer

[avro_specific_consumer_example](avro_specific_consumer_example) - consumer with Schema Registry and Avro Specific Deserializer

[avro_specific_producer_example](avro_specific_producer_example) - producer with Schema Registry and Avro Specific Serializer

[consumer_example](consumer_example) - Function & callback based consumer

[consumer_offset_metadata](consumer_offset_metadata) - Commit offset with metadata

[consumer_rebalance_example](consumer_rebalance_example) - Use of rebalance callback with manual commit
Expand All @@ -53,29 +55,29 @@ Examples
[idempotent_producer_example](idempotent_producer_example) - Idempotent producer

[json_consumer_example](json_consumer_example) - consumer with Schema Registry and JSON Schema Deserializer

[json_producer_example](json_producer_example) - producer with Schema Registry and JSON Schema Serializer

[legacy](legacy) - Legacy examples

[library-version](library-version) - Show the library version

[mockcluster_example](mockcluster_example) - Use a mock cluster for testing

[mockcluster_failure_example](mockcluster_failure_example) - Use a mock cluster for failure testing

[oauthbearer_consumer_example](oauthbearer_consumer_example) - Unsecured SASL/OAUTHBEARER consumer example

[oauthbearer_oidc_example](oauthbearer_oidc_example) - SASL/OAUTHBEARER with OIDC method example

[oauthbearer_producer_example](oauthbearer_producer_example) - Unsecured SASL/OAUTHBEARER producer example

[producer_custom_channel_example](producer_custom_channel_example) - Function based producer with a custom delivery channel

[producer_example](producer_example) - Function based producer

[protobuf_consumer_example](protobuf_consumer_example) - consumer with Schema Registry and Protocol Buffers Deserializer

[protobuf_producer_example](protobuf_producer_example) - producer with Schema Registry and Protocol Buffers Serializer

[stats_example](stats_example) - Receiving stats events
Expand Down
117 changes: 117 additions & 0 deletions examples/admin_list_offsets/admin_list_offsets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// List Offsets example
package main

import (
"context"
"fmt"
"os"
"strconv"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

if len(os.Args) < 2 {
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> <topicname> <partition> <EARLIEST/LATEST/MAX_TIMESTAMP/TIMESTAMP t1> ..\n", os.Args[0])
os.Exit(1)
}

bootstrapServers := os.Args[1]

argsCnt := len(os.Args)
i := 2
index := 0
topicPartitionOffsets := make(map[kafka.TopicPartition]kafka.OffsetSpec)
for i < argsCnt {
if i+3 > argsCnt {
fmt.Printf("Expected %d arguments for partition %d, got %d\n", 3, index, argsCnt-i)
os.Exit(1)
}

topicName := os.Args[i]
partition, err := strconv.Atoi(os.Args[i+1])
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid partition: %s\n", err)
os.Exit(1)
}

tp := kafka.TopicPartition{Topic: &topicName, Partition: int32(partition)}

if os.Args[i+2] == "EARLIEST" {
topicPartitionOffsets[tp] = kafka.EarliestOffsetSpec
} else if os.Args[i+2] == "LATEST" {
topicPartitionOffsets[tp] = kafka.LatestOffsetSpec
} else if os.Args[i+2] == "MAX_TIMESTAMP" {
topicPartitionOffsets[tp] = kafka.MaxTimestampOffsetSpec
} else if os.Args[i+2] == "TIMESTAMP" {
if i+4 > argsCnt {
fmt.Printf("Expected %d arguments for partition %d, got %d\n", 4, index, argsCnt-i)
os.Exit(1)
}

timestamp, timestampErr := strconv.Atoi(os.Args[i+3])
if timestampErr != nil {
fmt.Fprintf(os.Stderr, "Invalid timestamp: %s\n", timestampErr)
os.Exit(1)
}
topicPartitionOffsets[tp] = kafka.NewOffsetSpecForTimestamp(int64(timestamp))
i = i + 1
} else {
fmt.Fprintf(os.Stderr, "Invalid OffsetSpec.\n")
os.Exit(1)
}
i = i + 3
index++
}

// Create a new AdminClient.
// AdminClient can also be instantiated using an existing
// Producer or Consumer instance, see NewAdminClientFromProducer and
// NewAdminClientFromConsumer.
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
defer a.Close()

// Contexts are used to abort or limit the amount of time
// the Admin call blocks waiting for a result.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

results, err := a.ListOffsets(ctx, topicPartitionOffsets,
kafka.SetAdminIsolationLevel(kafka.IsolationLevelReadCommitted))
if err != nil {
fmt.Printf("Failed to List offsets: %v\n", err)
os.Exit(1)
}
// map[TopicPartition]ListOffsetsResultInfo
// Print results
for tp, info := range results.ResultsInfos {
fmt.Printf("Topic: %s Partition: %d\n", *tp.Topic, tp.Partition)
if info.Error.Code() != kafka.ErrNoError {
fmt.Printf(" ErrorCode: %d ErrorMessage: %s\n\n", info.Error.Code(), info.Error.String())
} else {
fmt.Printf(" Offset: %d Timestamp: %d\n\n", info.Offset, info.Timestamp)
}
}
}
128 changes: 128 additions & 0 deletions kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ AlterUserScramCredentials_result_response_by_idx(const rd_kafka_AlterUserScramCr
return responses[idx];
}

static const rd_kafka_ListOffsetsResultInfo_t *
ListOffsetsResultInfo_by_idx(const rd_kafka_ListOffsetsResultInfo_t **result_infos, size_t cnt, size_t idx) {
if (idx >= cnt)
return NULL;
return result_infos[idx];
}

static const rd_kafka_error_t *
error_by_idx(const rd_kafka_error_t **errors, size_t cnt, size_t idx) {
if (idx >= cnt)
Expand Down Expand Up @@ -988,6 +995,36 @@ type AlterUserScramCredentialsResult struct {
Errors map[string]Error
}

// OffsetSpec specifies desired offsets while using ListOffsets.
type OffsetSpec int64

const (
// MaxTimestampOffsetSpec is used to describe the offset with the Max Timestamp which may be different then LatestOffsetSpec as Timestamp can be set client side.
MaxTimestampOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP)
// EarliestOffsetSpec is used to describe the earliest offset for the TopicPartition.
EarliestOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_EARLIEST)
// LatestOffsetSpec is used to describe the latest offset for the TopicPartition.
LatestOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_LATEST)
)

// NewOffsetSpecForTimestamp creates an OffsetSpec corresponding to the timestamp.
func NewOffsetSpecForTimestamp(timestamp int64) OffsetSpec {
return OffsetSpec(timestamp)
}

// ListOffsetsResultInfo describes the result of ListOffsets request for a Topic Partition.
type ListOffsetsResultInfo struct {
Offset Offset
Timestamp int64
LeaderEpoch *int32
Error Error
}

// ListOffsetsResult holds the map of TopicPartition to ListOffsetsResultInfo for a request.
type ListOffsetsResult struct {
ResultsInfos map[TopicPartition]ListOffsetsResultInfo
}

// waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens
// first.
// The returned result event is checked for errors its error is returned if set.
Expand Down Expand Up @@ -1380,6 +1417,30 @@ func cToDescribeUserScramCredentialsResult(
return result
}

// cToListOffsetsResult converts a C
// rd_kafka_ListOffsets_result_t to a Go ListOffsetsResult
func cToListOffsetsResult(cRes *C.rd_kafka_ListOffsets_result_t) (result ListOffsetsResult) {
result = ListOffsetsResult{ResultsInfos: make(map[TopicPartition]ListOffsetsResultInfo)}
var cPartitionCount C.size_t
cResultInfos := C.rd_kafka_ListOffsets_result_infos(cRes, &cPartitionCount)
for itr := 0; itr < int(cPartitionCount); itr++ {
cResultInfo := C.ListOffsetsResultInfo_by_idx(cResultInfos, cPartitionCount, C.size_t(itr))
resultInfo := ListOffsetsResultInfo{}
cPartition := C.rd_kafka_ListOffsetsResultInfo_topic_partition(cResultInfo)
Topic := C.GoString(cPartition.topic)
Partition := TopicPartition{Topic: &Topic, Partition: int32(cPartition.partition)}
resultInfo.Offset = Offset(cPartition.offset)
resultInfo.Timestamp = int64(C.rd_kafka_ListOffsetsResultInfo_timestamp(cResultInfo))
cLeaderEpoch := int32(C.rd_kafka_topic_partition_get_leader_epoch(cPartition))
if cLeaderEpoch >= 0 {
resultInfo.LeaderEpoch = &cLeaderEpoch
}
resultInfo.Error = newError(cPartition.err)
result.ResultsInfos[Partition] = resultInfo
}
return result
}

// ConsumerGroupDescription converts a C rd_kafka_ConsumerGroupListing_t array
// to a Go ConsumerGroupListing slice.
func (a *AdminClient) cToConsumerGroupListings(
Expand Down Expand Up @@ -3151,6 +3212,73 @@ func (a *AdminClient) DescribeUserScramCredentials(
return result, nil
}

// ListOffsets describe offsets for the
// specified TopicPartiton based on an OffsetSpec.
//
// Parameters:
// - `ctx` - context with the maximum amount of time to block, or nil for
// indefinite.
// - `topicPartitionOffsets` - a map from TopicPartition to OffsetSpec, it holds either the OffsetSpec enum value or timestamp.
// - `options` - ListOffsetsAdminOption options.
//
// Returns a ListOffsetsResult.
// Each TopicPartition's ListOffset can have an individual error.
func (a *AdminClient) ListOffsets(
ctx context.Context, topicPartitionOffsets map[TopicPartition]OffsetSpec,
options ...ListOffsetsAdminOption) (result ListOffsetsResult, err error) {
if len(topicPartitionOffsets) < 1 || topicPartitionOffsets == nil {
return result, newErrorFromString(ErrInvalidArg, "expected topicPartitionOffsets of size greater or equal 1.")
}

topicPartitions := C.rd_kafka_topic_partition_list_new(C.int(len(topicPartitionOffsets)))
defer C.rd_kafka_topic_partition_list_destroy(topicPartitions)

for tp, offsetValue := range topicPartitionOffsets {
cStr := C.CString(*tp.Topic)
defer C.free(unsafe.Pointer(cStr))
topicPartition := C.rd_kafka_topic_partition_list_add(topicPartitions, cStr, C.int32_t(tp.Partition))
topicPartition.offset = C.int64_t(offsetValue)
}

// Convert Go AdminOptions (if any) to C AdminOptions.
genericOptions := make([]AdminOption, len(options))
for i := range options {
genericOptions[i] = options[i]
}
cOptions, err := adminOptionsSetup(
a.handle, C.RD_KAFKA_ADMIN_OP_LISTOFFSETS, genericOptions)
if err != nil {
return result, err
}
defer C.rd_kafka_AdminOptions_destroy(cOptions)

// Create temporary queue for async operation.
cQueue := C.rd_kafka_queue_new(a.handle.rk)
defer C.rd_kafka_queue_destroy(cQueue)

// Call rd_kafka_ListOffsets (asynchronous).
C.rd_kafka_ListOffsets(
a.handle.rk,
topicPartitions,
cOptions,
cQueue)

// Wait for result, error or context timeout.
rkev, err := a.waitResult(
ctx, cQueue, C.RD_KAFKA_EVENT_LISTOFFSETS_RESULT)
if err != nil {
return result, err
}
defer C.rd_kafka_event_destroy(rkev)

cRes := C.rd_kafka_event_ListOffsets_result(rkev)

// Convert result from C to Go.
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
result = cToListOffsetsResult(cRes)

return result, nil
}

// AlterUserScramCredentials alters SASL/SCRAM credentials.
// The pair (user, mechanism) must be unique among upsertions and deletions.
//
Expand Down
Loading