diff --git a/examples/.gitignore b/examples/.gitignore index bbf8dc9c1..d7580eecf 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -12,6 +12,7 @@ admin_incremental_alter_configs/admin_incremental_alter_configs admin_describe_consumer_groups/admin_describe_consumer_groups admin_list_consumer_groups/admin_list_consumer_groups admin_list_consumer_group_offsets/admin_list_consumer_group_offsets +admin_list_offsets/admin_list_offsets admin_describe_user_scram_credentials/admin_describe_user_scram_credentials admin_alter_user_scram_credentials/admin_alter_user_scram_credentials avro_generic_consumer_example/avro_generic_consumer_example diff --git a/examples/README.md b/examples/README.md index 05a5d0fb0..982ee4a31 100644 --- a/examples/README.md +++ b/examples/README.md @@ -5,13 +5,13 @@ Examples [admin_alter_consumer_group_offsets](admin_alter_consumer_group_offsets) - Alter Consumer Group Offsets [admin_create_acls](admin_create_acls) - Create Access Control Lists - + [admin_create_topic](admin_create_topic) - Create a topic [admin_delete_acls](admin_delete_acls) - Delete Access Control Lists using different filters - + [admin_delete_topics](admin_delete_topics) - Delete some topics - + [admin_delete_consumer_groups](admin_delete_consumer_groups) - Delete consumer groups [admin_delete_topics](admin_delete_topics) - Delete topics @@ -28,18 +28,20 @@ Examples [admin_list_consumer_group_offsets](admin_list_consumer_group_offsets) - List consumer group offsets + [admin_list_offsets](admin_list_offsets) - List partition offsets + [admin_list_consumer_groups](admin_list_consumer_groups) - List consumer groups - + [avro_generic_consumer_example](avro_generic_consumer_example) - consumer with Schema Registry and Avro Generic Deserializer [avro_generic_producer_example](avro_generic_producer_example) - producer with Schema Registry and Avro Generic Serializer - + [avro_specific_consumer_example](avro_specific_consumer_example) - consumer with Schema Registry and Avro Specific Deserializer - + [avro_specific_producer_example](avro_specific_producer_example) - producer with Schema Registry and Avro Specific Serializer [consumer_example](consumer_example) - Function & callback based consumer - + [consumer_offset_metadata](consumer_offset_metadata) - Commit offset with metadata [consumer_rebalance_example](consumer_rebalance_example) - Use of rebalance callback with manual commit @@ -53,29 +55,29 @@ Examples [idempotent_producer_example](idempotent_producer_example) - Idempotent producer [json_consumer_example](json_consumer_example) - consumer with Schema Registry and JSON Schema Deserializer - + [json_producer_example](json_producer_example) - producer with Schema Registry and JSON Schema Serializer - + [legacy](legacy) - Legacy examples - + [library-version](library-version) - Show the library version [mockcluster_example](mockcluster_example) - Use a mock cluster for testing - + [mockcluster_failure_example](mockcluster_failure_example) - Use a mock cluster for failure testing [oauthbearer_consumer_example](oauthbearer_consumer_example) - Unsecured SASL/OAUTHBEARER consumer example [oauthbearer_oidc_example](oauthbearer_oidc_example) - SASL/OAUTHBEARER with OIDC method example - + [oauthbearer_producer_example](oauthbearer_producer_example) - Unsecured SASL/OAUTHBEARER producer example [producer_custom_channel_example](producer_custom_channel_example) - Function based producer with a custom delivery channel [producer_example](producer_example) - Function based producer - + [protobuf_consumer_example](protobuf_consumer_example) - consumer with Schema Registry and Protocol Buffers Deserializer - + [protobuf_producer_example](protobuf_producer_example) - producer with Schema Registry and Protocol Buffers Serializer [stats_example](stats_example) - Receiving stats events diff --git a/examples/admin_list_offsets/admin_list_offsets.go b/examples/admin_list_offsets/admin_list_offsets.go new file mode 100644 index 000000000..917922bcd --- /dev/null +++ b/examples/admin_list_offsets/admin_list_offsets.go @@ -0,0 +1,117 @@ +/** + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// List Offsets example +package main + +import ( + "context" + "fmt" + "os" + "strconv" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func main() { + + if len(os.Args) < 2 { + fmt.Fprintf(os.Stderr, + "Usage: %s ..\n", os.Args[0]) + os.Exit(1) + } + + bootstrapServers := os.Args[1] + + argsCnt := len(os.Args) + i := 2 + index := 0 + topicPartitionOffsets := make(map[kafka.TopicPartition]kafka.OffsetSpec) + for i < argsCnt { + if i+3 > argsCnt { + fmt.Printf("Expected %d arguments for partition %d, got %d\n", 3, index, argsCnt-i) + os.Exit(1) + } + + topicName := os.Args[i] + partition, err := strconv.Atoi(os.Args[i+1]) + if err != nil { + fmt.Fprintf(os.Stderr, "Invalid partition: %s\n", err) + os.Exit(1) + } + + tp := kafka.TopicPartition{Topic: &topicName, Partition: int32(partition)} + + if os.Args[i+2] == "EARLIEST" { + topicPartitionOffsets[tp] = kafka.EarliestOffsetSpec + } else if os.Args[i+2] == "LATEST" { + topicPartitionOffsets[tp] = kafka.LatestOffsetSpec + } else if os.Args[i+2] == "MAX_TIMESTAMP" { + topicPartitionOffsets[tp] = kafka.MaxTimestampOffsetSpec + } else if os.Args[i+2] == "TIMESTAMP" { + if i+4 > argsCnt { + fmt.Printf("Expected %d arguments for partition %d, got %d\n", 4, index, argsCnt-i) + os.Exit(1) + } + + timestamp, timestampErr := strconv.Atoi(os.Args[i+3]) + if timestampErr != nil { + fmt.Fprintf(os.Stderr, "Invalid timestamp: %s\n", timestampErr) + os.Exit(1) + } + topicPartitionOffsets[tp] = kafka.NewOffsetSpecForTimestamp(int64(timestamp)) + i = i + 1 + } else { + fmt.Fprintf(os.Stderr, "Invalid OffsetSpec.\n") + os.Exit(1) + } + i = i + 3 + index++ + } + + // Create a new AdminClient. + // AdminClient can also be instantiated using an existing + // Producer or Consumer instance, see NewAdminClientFromProducer and + // NewAdminClientFromConsumer. + a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers}) + if err != nil { + fmt.Printf("Failed to create Admin client: %s\n", err) + os.Exit(1) + } + defer a.Close() + + // Contexts are used to abort or limit the amount of time + // the Admin call blocks waiting for a result. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + results, err := a.ListOffsets(ctx, topicPartitionOffsets, + kafka.SetAdminIsolationLevel(kafka.IsolationLevelReadCommitted)) + if err != nil { + fmt.Printf("Failed to List offsets: %v\n", err) + os.Exit(1) + } + // map[TopicPartition]ListOffsetsResultInfo + // Print results + for tp, info := range results.ResultsInfos { + fmt.Printf("Topic: %s Partition: %d\n", *tp.Topic, tp.Partition) + if info.Error.Code() != kafka.ErrNoError { + fmt.Printf(" ErrorCode: %d ErrorMessage: %s\n\n", info.Error.Code(), info.Error.String()) + } else { + fmt.Printf(" Offset: %d Timestamp: %d\n\n", info.Offset, info.Timestamp) + } + } +} diff --git a/kafka/adminapi.go b/kafka/adminapi.go index e603736a1..1fd333478 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -132,6 +132,13 @@ AlterUserScramCredentials_result_response_by_idx(const rd_kafka_AlterUserScramCr return responses[idx]; } +static const rd_kafka_ListOffsetsResultInfo_t * +ListOffsetsResultInfo_by_idx(const rd_kafka_ListOffsetsResultInfo_t **result_infos, size_t cnt, size_t idx) { + if (idx >= cnt) + return NULL; + return result_infos[idx]; +} + static const rd_kafka_error_t * error_by_idx(const rd_kafka_error_t **errors, size_t cnt, size_t idx) { if (idx >= cnt) @@ -988,6 +995,36 @@ type AlterUserScramCredentialsResult struct { Errors map[string]Error } +// OffsetSpec specifies desired offsets while using ListOffsets. +type OffsetSpec int64 + +const ( + // MaxTimestampOffsetSpec is used to describe the offset with the Max Timestamp which may be different then LatestOffsetSpec as Timestamp can be set client side. + MaxTimestampOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP) + // EarliestOffsetSpec is used to describe the earliest offset for the TopicPartition. + EarliestOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_EARLIEST) + // LatestOffsetSpec is used to describe the latest offset for the TopicPartition. + LatestOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_LATEST) +) + +// NewOffsetSpecForTimestamp creates an OffsetSpec corresponding to the timestamp. +func NewOffsetSpecForTimestamp(timestamp int64) OffsetSpec { + return OffsetSpec(timestamp) +} + +// ListOffsetsResultInfo describes the result of ListOffsets request for a Topic Partition. +type ListOffsetsResultInfo struct { + Offset Offset + Timestamp int64 + LeaderEpoch *int32 + Error Error +} + +// ListOffsetsResult holds the map of TopicPartition to ListOffsetsResultInfo for a request. +type ListOffsetsResult struct { + ResultsInfos map[TopicPartition]ListOffsetsResultInfo +} + // waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens // first. // The returned result event is checked for errors its error is returned if set. @@ -1380,6 +1417,30 @@ func cToDescribeUserScramCredentialsResult( return result } +// cToListOffsetsResult converts a C +// rd_kafka_ListOffsets_result_t to a Go ListOffsetsResult +func cToListOffsetsResult(cRes *C.rd_kafka_ListOffsets_result_t) (result ListOffsetsResult) { + result = ListOffsetsResult{ResultsInfos: make(map[TopicPartition]ListOffsetsResultInfo)} + var cPartitionCount C.size_t + cResultInfos := C.rd_kafka_ListOffsets_result_infos(cRes, &cPartitionCount) + for itr := 0; itr < int(cPartitionCount); itr++ { + cResultInfo := C.ListOffsetsResultInfo_by_idx(cResultInfos, cPartitionCount, C.size_t(itr)) + resultInfo := ListOffsetsResultInfo{} + cPartition := C.rd_kafka_ListOffsetsResultInfo_topic_partition(cResultInfo) + Topic := C.GoString(cPartition.topic) + Partition := TopicPartition{Topic: &Topic, Partition: int32(cPartition.partition)} + resultInfo.Offset = Offset(cPartition.offset) + resultInfo.Timestamp = int64(C.rd_kafka_ListOffsetsResultInfo_timestamp(cResultInfo)) + cLeaderEpoch := int32(C.rd_kafka_topic_partition_get_leader_epoch(cPartition)) + if cLeaderEpoch >= 0 { + resultInfo.LeaderEpoch = &cLeaderEpoch + } + resultInfo.Error = newError(cPartition.err) + result.ResultsInfos[Partition] = resultInfo + } + return result +} + // ConsumerGroupDescription converts a C rd_kafka_ConsumerGroupListing_t array // to a Go ConsumerGroupListing slice. func (a *AdminClient) cToConsumerGroupListings( @@ -3151,6 +3212,73 @@ func (a *AdminClient) DescribeUserScramCredentials( return result, nil } +// ListOffsets describe offsets for the +// specified TopicPartiton based on an OffsetSpec. +// +// Parameters: +// - `ctx` - context with the maximum amount of time to block, or nil for +// indefinite. +// - `topicPartitionOffsets` - a map from TopicPartition to OffsetSpec, it holds either the OffsetSpec enum value or timestamp. +// - `options` - ListOffsetsAdminOption options. +// +// Returns a ListOffsetsResult. +// Each TopicPartition's ListOffset can have an individual error. +func (a *AdminClient) ListOffsets( + ctx context.Context, topicPartitionOffsets map[TopicPartition]OffsetSpec, + options ...ListOffsetsAdminOption) (result ListOffsetsResult, err error) { + if len(topicPartitionOffsets) < 1 || topicPartitionOffsets == nil { + return result, newErrorFromString(ErrInvalidArg, "expected topicPartitionOffsets of size greater or equal 1.") + } + + topicPartitions := C.rd_kafka_topic_partition_list_new(C.int(len(topicPartitionOffsets))) + defer C.rd_kafka_topic_partition_list_destroy(topicPartitions) + + for tp, offsetValue := range topicPartitionOffsets { + cStr := C.CString(*tp.Topic) + defer C.free(unsafe.Pointer(cStr)) + topicPartition := C.rd_kafka_topic_partition_list_add(topicPartitions, cStr, C.int32_t(tp.Partition)) + topicPartition.offset = C.int64_t(offsetValue) + } + + // Convert Go AdminOptions (if any) to C AdminOptions. + genericOptions := make([]AdminOption, len(options)) + for i := range options { + genericOptions[i] = options[i] + } + cOptions, err := adminOptionsSetup( + a.handle, C.RD_KAFKA_ADMIN_OP_LISTOFFSETS, genericOptions) + if err != nil { + return result, err + } + defer C.rd_kafka_AdminOptions_destroy(cOptions) + + // Create temporary queue for async operation. + cQueue := C.rd_kafka_queue_new(a.handle.rk) + defer C.rd_kafka_queue_destroy(cQueue) + + // Call rd_kafka_ListOffsets (asynchronous). + C.rd_kafka_ListOffsets( + a.handle.rk, + topicPartitions, + cOptions, + cQueue) + + // Wait for result, error or context timeout. + rkev, err := a.waitResult( + ctx, cQueue, C.RD_KAFKA_EVENT_LISTOFFSETS_RESULT) + if err != nil { + return result, err + } + defer C.rd_kafka_event_destroy(rkev) + + cRes := C.rd_kafka_event_ListOffsets_result(rkev) + + // Convert result from C to Go. + result = cToListOffsetsResult(cRes) + + return result, nil +} + // AlterUserScramCredentials alters SASL/SCRAM credentials. // The pair (user, mechanism) must be unique among upsertions and deletions. // diff --git a/kafka/adminoptions.go b/kafka/adminoptions.go index 387330c1f..73aacb2be 100644 --- a/kafka/adminoptions.go +++ b/kafka/adminoptions.go @@ -175,6 +175,60 @@ func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout) { return ao } +// IsolationLevel is a type which is used for AdminOptions to set the IsolationLevel. +type IsolationLevel int + +const ( + // IsolationLevelReadUncommitted - read uncommitted isolation level + IsolationLevelReadUncommitted = IsolationLevel(C.RD_KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED) + // IsolationLevelReadCommitted - read committed isolation level + IsolationLevelReadCommitted = IsolationLevel(C.RD_KAFKA_ISOLATION_LEVEL_READ_COMMITTED) +) + +// AdminOptionIsolationLevel sets the overall request IsolationLevel. +// +// Default: `ReadUncommitted`. +// +// Valid for ListOffsets. +type AdminOptionIsolationLevel struct { + isSet bool + val IsolationLevel +} + +func (ao AdminOptionIsolationLevel) supportsListOffsets() { +} +func (ao AdminOptionIsolationLevel) apply(cOptions *C.rd_kafka_AdminOptions_t) error { + if !ao.isSet { + return nil + } + + cErrstrSize := C.size_t(512) + cErrstr := (*C.char)(C.malloc(cErrstrSize)) + defer C.free(unsafe.Pointer(cErrstr)) + + cError := C.rd_kafka_AdminOptions_set_isolation_level( + cOptions, C.rd_kafka_IsolationLevel_t(ao.val)) + if cError != nil { + C.rd_kafka_AdminOptions_destroy(cOptions) + return newErrorFromCErrorDestroy(cError) + + } + + return nil + +} + +// SetAdminIsolationLevel sets the overall IsolationLevel for a request. +// +// Default: `ReadUncommitted`. +// +// Valid for ListOffsets. +func SetAdminIsolationLevel(isolationLevel IsolationLevel) (ao AdminOptionIsolationLevel) { + ao.isSet = true + ao.val = isolationLevel + return ao +} + // AdminOptionValidateOnly tells the broker to only validate the request, // without performing the requested operation (create topics, etc). // @@ -499,6 +553,14 @@ type AlterUserScramCredentialsAdminOption interface { apply(cOptions *C.rd_kafka_AdminOptions_t) error } +// ListOffsetsAdminOption - see setter. +// +// See SetAdminRequestTimeout, SetAdminIsolationLevel. +type ListOffsetsAdminOption interface { + supportsListOffsets() + apply(cOptions *C.rd_kafka_AdminOptions_t) error +} + // AdminOption is a generic type not to be used directly. // // See CreateTopicsAdminOption et.al. diff --git a/kafka/integration_test.go b/kafka/integration_test.go index ed57d20ee..c92162729 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -1,5 +1,5 @@ /** - * Copyright 2016 Confluent Inc. + * Copyright 2023 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -3148,6 +3148,93 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() { } } +// Tests ListOffsets API which describes +// the offset of a TopicPartition corresponding to the OffsetSpec provided. +func (its *IntegrationTestSuite) TestAdminClient_ListOffsets() { + t := its.T() + bootstrapServers := testconf.Brokers + rand.Seed(time.Now().Unix()) + assert := its.Assert() + + // Create a new AdminClient. + a := createAdminClient(t) + defer a.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + topicPartitionOffsets := make(map[TopicPartition]OffsetSpec) + Topic := fmt.Sprintf("%s-%d", testconf.TopicName, rand.Int()) + + topics := []TopicSpecification{TopicSpecification{Topic: Topic, NumPartitions: 1, ReplicationFactor: 1}} + createTopicResult, createTopicError := a.CreateTopics(ctx, topics) + assert.Nil(createTopicError, "Create Topics should not fail.") + assert.Equal(createTopicResult[0].Error.Code(), ErrNoError, "Create Topics Error Code should be ErrNoError.") + + p, err := NewProducer(&ConfigMap{"bootstrap.servers": bootstrapServers}) + assert.Nil(err, "Unable to create Producer.") + defer p.Close() + + timestamp := time.Now() + t1 := timestamp.Add(time.Second * 100) + t2 := timestamp.Add(time.Second * 300) + t3 := timestamp.Add(time.Second * 200) + + p.Produce(&Message{ + TopicPartition: TopicPartition{Topic: &Topic, Partition: 0}, + Value: []byte("Message-1"), + Timestamp: t1, + }, nil) + + p.Produce(&Message{ + TopicPartition: TopicPartition{Topic: &Topic, Partition: 0}, + Value: []byte("Message-2"), + Timestamp: t2, + }, nil) + + p.Produce(&Message{ + TopicPartition: TopicPartition{Topic: &Topic, Partition: 0}, + Value: []byte("Message-3"), + Timestamp: t3, + }, nil) + + p.Flush(5 * 1000) + + tp1 := TopicPartition{Topic: &Topic, Partition: 0} + topicPartitionOffsets[tp1] = EarliestOffsetSpec + var results ListOffsetsResult + results, err = a.ListOffsets(ctx, topicPartitionOffsets, SetAdminIsolationLevel(IsolationLevelReadCommitted)) + assert.Nil(err, "ListOffsets should not fail.") + + for _, info := range results.ResultsInfos { + assert.Equal(info.Error.Code(), ErrNoError, "Error code should be ErrNoError.") + assert.Equal(info.Offset, int64(0), "Offset should be ErrNoError.") + } + + topicPartitionOffsets[tp1] = LatestOffsetSpec + results, err = a.ListOffsets(ctx, topicPartitionOffsets, SetAdminIsolationLevel(IsolationLevelReadCommitted)) + assert.Nil(err, "ListOffsets should not fail.") + + for _, info := range results.ResultsInfos { + assert.Equal(info.Error.Code(), ErrNoError, "Error code should be ErrNoError.") + assert.Equal(info.Offset, int64(3), "Offset should be 3.") + } + + topicPartitionOffsets[tp1] = OffsetSpec(MaxTimestampOffsetSpec) + results, err = a.ListOffsets(ctx, topicPartitionOffsets, SetAdminIsolationLevel(IsolationLevelReadCommitted)) + assert.Nil(err, "ListOffsets should not fail.") + + for _, info := range results.ResultsInfos { + assert.Equal(info.Error.Code(), ErrNoError, "Error code should be ErrNoError.") + assert.Equal(info.Offset, int64(1), "Offset should be 1.") + } + + delTopics := []string{Topic} + _, err = a.DeleteTopics(ctx, delTopics) + assert.Nil(err, "DeleteTopics should not fail.") + +} + func TestIntegration(t *testing.T) { its := new(IntegrationTestSuite) testconfInit()