Skip to content

Commit

Permalink
Deprecate channel based clients (#894)
Browse files Browse the repository at this point in the history
Adds Deprecated: to the godoc for ProduceChannel, and consumer's Event
method.

Adds BUG: to the godoc for Flush and Len to emphasize they don't play
well with ProduceChannel.

Change examples and changelog.
  • Loading branch information
milindl authored and PrasanthV454 committed Mar 17, 2023
1 parent 8392e54 commit 3885484
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 49 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# Confluent's Golang client for Apache Kafka

## v1.9.3
## v2.0.0

This is a maintenance release:

* Added SetSaslCredentials. This new method (on the Producer, Consumer, and
AdminClient) allows modifying the stored SASL PLAIN/SCRAM credentials that
will be used for subsequent (new) connections to a broker.
* Channel based producer (Producer `ProduceChannel()`) and channel based
consumer (Consumer `Events()`) are deprecated.


## v1.9.2
Expand Down
100 changes: 54 additions & 46 deletions examples/go-kafkacat/go-kafkacat.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ func runProducer(config *kafka.ConfigMap, topic string, partition int32) {
msg.Value = ([]byte)(line)
}

p.ProduceChannel() <- &msg
if err = p.Produce(&msg, nil); err != nil {
fmt.Fprintf(os.Stderr, "%% Produce error: %v\n", err)
}
}
}

Expand All @@ -130,57 +132,63 @@ func runConsumer(config *kafka.ConfigMap, topics []string) {

fmt.Fprintf(os.Stderr, "%% Created Consumer %v\n", c)

c.SubscribeTopics(topics, nil)
c.SubscribeTopics(topics, func(c *kafka.Consumer, ev kafka.Event) error {
var err error = nil
switch e := ev.(type) {
case kafka.AssignedPartitions:
fmt.Fprintf(os.Stderr, "%% %v\n", e)
err = c.Assign(e.Partitions)
partitionCnt = len(e.Partitions)
eofCnt = 0
case kafka.RevokedPartitions:
fmt.Fprintf(os.Stderr, "%% %v\n", e)
err = c.Unassign()
partitionCnt = 0
eofCnt = 0
}
return err
})

run := true

for run == true {
select {

case sig := <-sigs:
fmt.Fprintf(os.Stderr, "%% Terminating on signal %v\n", sig)
run = false
go func() {
sig := <-sigs
fmt.Fprintf(os.Stderr, "%% Terminating on signal %v\n", sig)
run = false
}()

case ev := <-c.Events():
switch e := ev.(type) {
case kafka.AssignedPartitions:
fmt.Fprintf(os.Stderr, "%% %v\n", e)
c.Assign(e.Partitions)
partitionCnt = len(e.Partitions)
eofCnt = 0
case kafka.RevokedPartitions:
fmt.Fprintf(os.Stderr, "%% %v\n", e)
c.Unassign()
partitionCnt = 0
eofCnt = 0
case *kafka.Message:
if verbosity >= 2 {
fmt.Fprintf(os.Stderr, "%% %v:\n", e.TopicPartition)
}
if keyDelim != "" {
if e.Key != nil {
fmt.Printf("%s%s", string(e.Key), keyDelim)
} else {
fmt.Printf("%s", keyDelim)
}
}
fmt.Println(string(e.Value))
case kafka.PartitionEOF:
fmt.Fprintf(os.Stderr, "%% Reached %v\n", e)
eofCnt++
if exitEOF && eofCnt >= partitionCnt {
run = false
}
case kafka.Error:
// Errors should generally be considered as informational, the client will try to automatically recover
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
case kafka.OffsetsCommitted:
if verbosity >= 2 {
fmt.Fprintf(os.Stderr, "%% %v\n", e)
for run == true {
ev := c.Poll(1000)
switch e := ev.(type) {
case *kafka.Message:
if verbosity >= 2 {
fmt.Fprintf(os.Stderr, "%% %v:\n", e.TopicPartition)
}
if keyDelim != "" {
if e.Key != nil {
fmt.Printf("%s%s", string(e.Key), keyDelim)
} else {
fmt.Printf("%s", keyDelim)
}
default:
fmt.Fprintf(os.Stderr, "%% Unhandled event %T ignored: %v\n", e, e)
}
fmt.Println(string(e.Value))
case kafka.PartitionEOF:
fmt.Fprintf(os.Stderr, "%% Reached %v\n", e)
eofCnt++
if exitEOF && eofCnt >= partitionCnt {
run = false
}
case kafka.Error:
// Errors should generally be considered as informational, the client will try to automatically recover.
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
case kafka.OffsetsCommitted:
if verbosity >= 2 {
fmt.Fprintf(os.Stderr, "%% %v\n", e)
}
case nil:
// Ignore, Poll() timed out.
default:
fmt.Fprintf(os.Stderr, "%% Unhandled event %T ignored: %v\n", e, e)
}
}

Expand Down
1 change: 1 addition & 0 deletions examples/legacy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Cons:

* Double queueing: messages are first queued in the channel (size is configurable)
and then inside librdkafka.
* Flushing issues if using channel based producer due to a known bug in the implementation.

See [producer_channel_example](producer_channel_example)

Expand Down
3 changes: 3 additions & 0 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ func (c *Consumer) Poll(timeoutMs int) (event Event) {
}

// Events returns the Events channel (if enabled)
//
// Deprecated: Events (channel based consumer) is deprecated in favour
// of Poll().
func (c *Consumer) Events() chan Event {
return c.events
}
Expand Down
8 changes: 6 additions & 2 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,21 +317,25 @@ func (p *Producer) Logs() chan LogEvent {
}

// ProduceChannel returns the produce *Message channel (write)
//
// Deprecated: ProduceChannel (channel based producer) is deprecated in favour
// of Produce().
// Flush() and Len() are not guaranteed to be reliable with ProduceChannel.
func (p *Producer) ProduceChannel() chan *Message {
return p.produceChannel
}

// Len returns the number of messages and requests waiting to be transmitted to the broker
// as well as delivery reports queued for the application.
// Includes messages on ProduceChannel.
// BUG: Tries to include messages on ProduceChannel, but it's not guaranteed to be reliable.
func (p *Producer) Len() int {
return len(p.produceChannel) + len(p.events) + int(C.rd_kafka_outq_len(p.handle.rk))
}

// Flush and wait for outstanding messages and requests to complete delivery.
// Includes messages on ProduceChannel.
// Runs until value reaches zero or on timeoutMs.
// Returns the number of outstanding events still un-flushed.
// BUG: Tries to include messages on ProduceChannel, but it's not guaranteed to be reliable.
func (p *Producer) Flush(timeoutMs int) int {
termChan := make(chan bool) // unused stand-in termChan

Expand Down

0 comments on commit 3885484

Please sign in to comment.