Skip to content

Commit

Permalink
Use command line parser
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed Oct 10, 2024
1 parent e68608d commit 3a31587
Showing 1 changed file with 34 additions and 43 deletions.
77 changes: 34 additions & 43 deletions examples/admin_list_consumer_groups/admin_list_consumer_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,71 +19,62 @@ package main

import (
"context"
"flag"
"fmt"
"os"
"strings"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func usage(message string) {
func usage(message string, fs *flag.FlagSet) {
if message != "" {
fmt.Fprintf(os.Stderr,
message)
}
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> [-states <state1> <state2> ...] [-types <type1> <type2> ...] \n", os.Args[0])
fs.Usage()
os.Exit(1)
}

func parseListConsumerGroupsArgs() (states []kafka.ConsumerGroupState, types []kafka.ConsumerGroupType) {
if len(os.Args) > 2 {
args := os.Args[2:]
stateArray := false
typeArray := false
lastArray := 0
for _, arg := range args {
if arg == "-states" {
if stateArray {
usage("Cannot pass the states flag (-states) more than once.\n")
}
lastArray = 1
stateArray = true
} else if arg == "-types" {
if typeArray {
usage("Cannot pass the types flag (-types) more than once.\n")
}
lastArray = 2
typeArray = true
} else {
if lastArray == 1 {
state, _ := kafka.ConsumerGroupStateFromString(arg)
if state == kafka.ConsumerGroupStateUnknown {
usage(fmt.Sprintf("Given state %s is not a valid state\n", arg))
}
states = append(states, state)
} else if lastArray == 2 {
groupType := kafka.ConsumerGroupTypeFromString(arg)
if groupType == kafka.ConsumerGroupTypeUnknown {
usage(fmt.Sprintf("Given type %s is not a valid type\n", arg))
}
types = append(types, groupType)
} else {
usage(fmt.Sprintf("Unknown argument: %s\n", arg))
}
func parseListConsumerGroupsArgs() (
bootstrapServers string,
states []kafka.ConsumerGroupState,
types []kafka.ConsumerGroupType,
) {
var statesString, typesString string
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
fs.StringVar(&bootstrapServers, "b", "localhost:9092", "Bootstrap servers")
fs.StringVar(&statesString, "states", "", "States to match")
fs.StringVar(&typesString, "types", "", "Types to match")
fs.Parse(os.Args[1:])

if statesString != "" {
for _, stateString := range strings.Split(statesString, ",") {
state, _ := kafka.ConsumerGroupStateFromString(stateString)
if state == kafka.ConsumerGroupStateUnknown {
usage(fmt.Sprintf("Given state %s is not a valid state\n",
stateString), fs)
}
states = append(states, state)
}
}
if typesString != "" {
for _, typeString := range strings.Split(typesString, ",") {
groupType := kafka.ConsumerGroupTypeFromString(typeString)
if groupType == kafka.ConsumerGroupTypeUnknown {
usage(fmt.Sprintf("Given type %s is not a valid type\n",
typeString), fs)
}
types = append(types, groupType)
}
}
return
}

func main() {

if len(os.Args) < 2 {
usage("")
}
bootstrapServers := os.Args[1]
states, types := parseListConsumerGroupsArgs()
bootstrapServers, states, types := parseListConsumerGroupsArgs()

// Create a new AdminClient.
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
Expand Down

0 comments on commit 3a31587

Please sign in to comment.