diff --git a/consumer.go b/consumer.go index 72c4d7cd8..a0828ef22 100644 --- a/consumer.go +++ b/consumer.go @@ -887,6 +887,14 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { request.Version = 4 request.Isolation = bc.consumer.conf.Consumer.IsolationLevel } + // TODO: verify that versions 5-9 didn't introduce breaking change. + // v9 introduces FETCH_REQUEST_TOPIC_V9, + // v7 introduces FORGOTTEN_TOPIC_DATA_V7, SESSION_ID, & SESSION_EPOCH, and + // v5 introduces TOPICS_V5 (subsumed by FETCH_REQUEST_TOPIC_V9). + // There also may be response level changes. + if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) { + request.Version = 10 + } for child := range bc.subscriptions { request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize) diff --git a/produce_set.go b/produce_set.go index bba0f7e1f..cd5c75692 100644 --- a/produce_set.go +++ b/produce_set.go @@ -127,6 +127,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest { if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { req.Version = 3 } + // TODO: check for Version cases between 4-6 inclusive. + // No protocol changes are introduced to the request, but the response changes. + if ps.parent.conf.Version.IsAtLeast(V2_1_0_0) { + req.Version = 7 + } for topic, partitionSets := range ps.msgs { for partition, set := range partitionSets {