Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP 396 ListOffsets #1029

Merged
merged 12 commits into from
Oct 23, 2023
103 changes: 103 additions & 0 deletions examples/admin_list_offsets/admin_list_offsets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// List Offsets example
package main

import (
"context"
"fmt"
"os"
"strconv"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

if len(os.Args) < 2 {
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> <topicname> <partition> <EARLIEST/LATEST/MAXTIMESTAMP/TIMESTAMP t1> ..\n", os.Args[0])
os.Exit(1)
}

bootstrapServers := os.Args[1]

args := len(os.Args)
i := 2
topicPartitionOffsets := make(map[kafka.TopicPartition]kafka.OffsetSpec)
for i < args {
topicName := os.Args[i]
partition, err := strconv.Atoi(os.Args[i+1])
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid partition: %s\n", err)
os.Exit(1)
}
tp := kafka.TopicPartition{Topic: &topicName, Partition: int32(partition)}
if os.Args[i+2] == "EARLIEST" {
topicPartitionOffsets[tp] = kafka.EarliestOffsetSpec
} else if os.Args[i+2] == "LATEST" {
topicPartitionOffsets[tp] = kafka.LatestOffsetSpec
} else if os.Args[i+2] == "MAXTIMESTAMP" {
topicPartitionOffsets[tp] = kafka.MaxTimestampOffsetSpec
} else if os.Args[i+2] == "TIMESTAMP" {
timestamp, timestampErr := strconv.Atoi(os.Args[i+3])
if timestampErr != nil {
fmt.Fprintf(os.Stderr, "Invalid timestamp: %s\n", timestampErr)
os.Exit(1)
}
topicPartitionOffsets[tp] = kafka.NewOffsetSpecOfTimestamp(int64(timestamp))
i = i + 1
} else {
fmt.Fprintf(os.Stderr, "Invalid OffsetSpec.\n")
os.Exit(1)
}
i = i + 3
}

// Create a new AdminClient.
// AdminClient can also be instantiated using an existing
// Producer or Consumer instance, see NewAdminClientFromProducer and
// NewAdminClientFromConsumer.
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}
defer a.Close()

// Contexts are used to abort or limit the amount of time
// the Admin call blocks waiting for a result.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

results, err := a.ListOffsets(ctx, topicPartitionOffsets, kafka.SetAdminIsolationLevel(kafka.ReadCommitted))
if err != nil {
fmt.Printf("Failed to List offsets: %v\n", err)
os.Exit(1)
}
// map[TopicPartition]ListOffsetsResultInfo
// Print results
for tp, info := range results.Results {
fmt.Printf("Topic: %s Partition_Index : %d\n", *tp.Topic, tp.Partition)
if info.Error.Code() != kafka.ErrNoError {
fmt.Printf(" ErrorCode : %d ErrorMessage : %s\n\n", info.Error.Code(), info.Error.String())
} else {
fmt.Printf(" Offset : %d Timestamp : %d\n\n", info.Offset, info.Timestamp)
}
}

}
128 changes: 128 additions & 0 deletions kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ AlterUserScramCredentials_result_response_by_idx(const rd_kafka_AlterUserScramCr
return responses[idx];
}

static const rd_kafka_ListOffsetsResultInfo_t *
ListOffsetsResultInfo_by_idx(const rd_kafka_ListOffsetsResultInfo_t **result_infos, size_t cnt, size_t idx) {
if (idx >= cnt)
return NULL;
return result_infos[idx];
}

static const rd_kafka_error_t *
error_by_idx(const rd_kafka_error_t **errors, size_t cnt, size_t idx) {
if (idx >= cnt)
Expand Down Expand Up @@ -897,6 +904,39 @@ type AlterUserScramCredentialsResult struct {
Errors map[string]Error
}

// OffsetSpec
// OffsetSpec specifies desired offsets while using ListOffsets.
type OffsetSpec int64

const (
// MaxTimestampOffsetSpec is used to describe the offset with the Max Timestamp which may be different then LatestOffsetSpec as Timestamp can be set client side.
MaxTimestampOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP)
// EarliestOffsetSpec is used to describe the earliest offset for the TopicPartition.
EarliestOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_EARLIEST)
// LatestOffsetSpec is used to describe the latest offset for the TopicPartition.
LatestOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_LATEST)
)

// Creates an OffsetSpec corresponding to the timestamp.
func NewOffsetSpecOfTimestamp(timestamp int64) OffsetSpec {
return OffsetSpec(timestamp)
}

// ListOffsetsResultInfo
// Describes the result of ListOffsets request for a Topic Partition.
type ListOffsetsResultInfo struct {
Offset int64
emasab marked this conversation as resolved.
Show resolved Hide resolved
Timestamp int64
LeaderEpoch int
emasab marked this conversation as resolved.
Show resolved Hide resolved
Error Error
}

// ListOffsetsResult
// Holds the map of TopicPartition to ListOffsetsResultInfo for a request.
type ListOffsetsResult struct {
Results map[TopicPartition]ListOffsetsResultInfo
emasab marked this conversation as resolved.
Show resolved Hide resolved
}

// waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens
// first.
// The returned result event is checked for errors its error is returned if set.
Expand Down Expand Up @@ -1105,6 +1145,27 @@ func cToDescribeUserScramCredentialsResult(
return result
}

// cToListOffsetsResult converts a C
// rd_kafka_ListOffsets_result_t to a Go ListOffsetsResult
func cToListOffsetsResult(cRes *C.rd_kafka_ListOffsets_result_t) (result ListOffsetsResult) {
result = ListOffsetsResult{Results: make(map[TopicPartition]ListOffsetsResultInfo)}
var cPartitionCount C.size_t
cResultInfos := C.rd_kafka_ListOffsets_result_infos(cRes, &cPartitionCount)
for itr := 0; itr < int(cPartitionCount); itr++ {
cResultInfo := C.ListOffsetsResultInfo_by_idx(cResultInfos, cPartitionCount, C.size_t(itr))
Value := ListOffsetsResultInfo{}
emasab marked this conversation as resolved.
Show resolved Hide resolved
cPartition := C.rd_kafka_ListOffsetsResultInfo_topic_partition(cResultInfo)
Topic := C.GoString(cPartition.topic)
Partition := TopicPartition{Topic: &Topic, Partition: int32(cPartition.partition)}
Value.Offset = int64(cPartition.offset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Value.Offset = int64(cPartition.offset)
Value.Offset = kafka.Offset(cPartition.offset)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just using Offset() here since the package is already kafka

Value.Timestamp = int64(C.rd_kafka_ListOffsetsResultInfo_timestamp(cResultInfo))
Value.LeaderEpoch = -1
emasab marked this conversation as resolved.
Show resolved Hide resolved
Value.Error = newError(cPartition.err)
result.Results[Partition] = Value
}
return result
}

// ConsumerGroupDescription converts a C rd_kafka_ConsumerGroupListing_t array
// to a Go ConsumerGroupListing slice.
func (a *AdminClient) cToConsumerGroupListings(
Expand Down Expand Up @@ -2734,6 +2795,73 @@ func (a *AdminClient) DescribeUserScramCredentials(
return result, nil
}

// ListOffsets describe offsets for the
// specified TopicPartiton based on an OffsetSpec.
//
// Parameters:
// - `ctx` - context with the maximum amount of time to block, or nil for
// indefinite.
// - `topicPartitionOffsets` - a map from TopicPartition to OffsetSpec, it holds either the OffsetSpec enum value or timestamp.
// - `options` - ListOffsetsAdminOption options.
//
// Returns a ListOffsetsResult.
// Each TopicPartition's ListOffset can have an individual error.
func (a *AdminClient) ListOffsets(
ctx context.Context, topicPartitionOffsets map[TopicPartition]OffsetSpec,
options ...ListOffsetsAdminOption) (result ListOffsetsResult, err error) {
if len(topicPartitionOffsets) < 1 || topicPartitionOffsets == nil {
return result, newErrorFromString(ErrInvalidArg, "expected topicPartitionOffsets of size greater or equal 1.")
}

topicPartitions := C.rd_kafka_topic_partition_list_new(C.int(len(topicPartitionOffsets)))
defer C.rd_kafka_topic_partition_list_destroy(topicPartitions)

for tp, offsetValue := range topicPartitionOffsets {
cStr := C.CString(*tp.Topic)
defer C.free(unsafe.Pointer(cStr))
topicPartition := C.rd_kafka_topic_partition_list_add(topicPartitions, cStr, C.int32_t(tp.Partition))
topicPartition.offset = C.int64_t(offsetValue)
}

// Convert Go AdminOptions (if any) to C AdminOptions.
genericOptions := make([]AdminOption, len(options))
for i := range options {
genericOptions[i] = options[i]
}
cOptions, err := adminOptionsSetup(
a.handle, C.RD_KAFKA_ADMIN_OP_LISTOFFSETS, genericOptions)
if err != nil {
return result, err
}
defer C.rd_kafka_AdminOptions_destroy(cOptions)

// Create temporary queue for async operation.
cQueue := C.rd_kafka_queue_new(a.handle.rk)
defer C.rd_kafka_queue_destroy(cQueue)

// Call rd_kafka_ListOffsets (asynchronous).
C.rd_kafka_ListOffsets(
a.handle.rk,
topicPartitions,
cOptions,
cQueue)

// Wait for result, error or context timeout.
rkev, err := a.waitResult(
ctx, cQueue, C.RD_KAFKA_EVENT_LISTOFFSETS_RESULT)
if err != nil {
return result, err
}
defer C.rd_kafka_event_destroy(rkev)

cRes := C.rd_kafka_event_ListOffsets_result(rkev)

// Convert result from C to Go.
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
result = cToListOffsetsResult(cRes)

return result, nil
}

// AlterUserScramCredentials alters SASL/SCRAM credentials.
// The pair (user, mechanism) must be unique among upsertions and deletions.
//
Expand Down
59 changes: 59 additions & 0 deletions kafka/adminoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,57 @@ func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout) {
return ao
}

type IsolationLevel int

const (
ReadUncommitted = IsolationLevel(C.RD_KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As other enums:

Suggested change
ReadUncommitted = IsolationLevel(C.RD_KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED)
IsolationLevelReadUncommitted = IsolationLevel(C.RD_KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED)

ReadCommitted = IsolationLevel(C.RD_KAFKA_ISOLATION_LEVEL_READ_COMMITTED)
)

// AdminOptionIsolationLevel sets the overall request IsolationLevel.
//
// Default: `ReadUncommitted`.
//
// Valid for ListOffsets.
type AdminOptionIsolationLevel struct {
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
isSet bool
val IsolationLevel
}

func (ao AdminOptionIsolationLevel) supportsListOffsets() {
}
func (ao AdminOptionIsolationLevel) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
if !ao.isSet {
return nil
}

cErrstrSize := C.size_t(512)
cErrstr := (*C.char)(C.malloc(cErrstrSize))
defer C.free(unsafe.Pointer(cErrstr))

cError := C.rd_kafka_AdminOptions_set_isolation_level(
cOptions, C.rd_kafka_IsolationLevel_t(ao.val))
if cError != nil {
C.rd_kafka_AdminOptions_destroy(cOptions)
return newErrorFromCErrorDestroy(cError)

}

return nil

}

// SetAdminIsolationLevel sets the overall IsolationLevel for a request.
//
// Default: `ReadUncommitted`.
//
// Valid for ListOffsets.
func SetAdminIsolationLevel(isolationLevel IsolationLevel) (ao AdminOptionIsolationLevel) {
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
ao.isSet = true
ao.val = isolationLevel
return ao
}

// AdminOptionValidateOnly tells the broker to only validate the request,
// without performing the requested operation (create topics, etc).
//
Expand Down Expand Up @@ -434,6 +485,14 @@ type AlterUserScramCredentialsAdminOption interface {
apply(cOptions *C.rd_kafka_AdminOptions_t) error
}

// ListOffsetsAdminOption - see setter.
mahajanadhitya marked this conversation as resolved.
Show resolved Hide resolved
//
// See SetAdminRequestTimeout, SetAdminIsolationLevel.
type ListOffsetsAdminOption interface {
supportsListOffsets()
apply(cOptions *C.rd_kafka_AdminOptions_t) error
}

// AdminOption is a generic type not to be used directly.
//
// See CreateTopicsAdminOption et.al.
Expand Down
Loading