diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 18f22f67e3..a16193f4d4 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -458,13 +458,21 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) pi, ok := p.pendingQueue.Peek().(*pendingItem) if !ok { - p.log.Warnf("Received ack for %v although the pending queue is empty", response.GetMessageId()) + // if we receive a receipt although the pending queue is empty, the state of the broker and the producer differs. + // At that point, it is better to close the connection to the broker to reconnect to a broker hopping it solves + // the state discrepancy. + p.log.Warnf("Received ack for %v although the pending queue is empty, closing connection", response.GetMessageId()) + p.cnx.Close() return } if pi.sequenceID != response.GetSequenceId() { - p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v", response.GetMessageId(), + // if we receive a receipt that is not the one expected, the state of the broker and the producer differs. + // At that point, it is better to close the connection to the broker to reconnect to a broker hopping it solves + // the state discrepancy. + p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(), response.GetSequenceId(), pi.sequenceID) + p.cnx.Close() return }