Skip to content

Commit

Permalink
Deprecate channel based clients
Browse files Browse the repository at this point in the history
Adds Deprecated: to the godoc for ProduceChannel, and consumer's Event
method.

Adds BUG: to the godoc for Flush and Len to emphasize they don't play
well with ProduceChannel.
  • Loading branch information
milindl committed Nov 9, 2022
1 parent 4b799a0 commit af58728
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
3 changes: 3 additions & 0 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ func (c *Consumer) Poll(timeoutMs int) (event Event) {
}

// Events returns the Events channel (if enabled)
//
// Deprecated: Events (channel based consumer) is deprecated in favour
// of Poll.
func (c *Consumer) Events() chan Event {
return c.events
}
Expand Down
8 changes: 6 additions & 2 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,21 +317,25 @@ func (p *Producer) Logs() chan LogEvent {
}

// ProduceChannel returns the produce *Message channel (write)
//
// Deprecated: ProduceChannel (channel based producer) is deprecated in favour
// of Produce.
// Flush() and Len() are not guaranteed to work correctly with ProduceChannel.
func (p *Producer) ProduceChannel() chan *Message {
return p.produceChannel
}

// Len returns the number of messages and requests waiting to be transmitted to the broker
// as well as delivery reports queued for the application.
// Includes messages on ProduceChannel.
// BUG: Tries to include messages on ProduceChannel, but it's not guaranteed to work.
func (p *Producer) Len() int {
return len(p.produceChannel) + len(p.events) + int(C.rd_kafka_outq_len(p.handle.rk))
}

// Flush and wait for outstanding messages and requests to complete delivery.
// Includes messages on ProduceChannel.
// Runs until value reaches zero or on timeoutMs.
// Returns the number of outstanding events still un-flushed.
// BUG: Tries to include messages on ProduceChannel, but it's not guaranteed to work.
func (p *Producer) Flush(timeoutMs int) int {
termChan := make(chan bool) // unused stand-in termChan

Expand Down

0 comments on commit af58728

Please sign in to comment.