From 36c9ca0cdf1c10c4f939c1de4f1cd69211bf4752 Mon Sep 17 00:00:00 2001 From: kyrietu Date: Sat, 6 Feb 2021 10:01:43 +0800 Subject: [PATCH] add code for send error situation --- pulsar/internal/connection.go | 18 ++++++++++++++++++ pulsar/producer_partition.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 23dc7a1..605befa 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -87,6 +87,9 @@ type ConnectionListener interface { // ReceivedSendReceipt receive and process the return value of the send command. ReceivedSendReceipt(response *pb.CommandSendReceipt) + //receive and process the return value of the sendError command. + ReceivedSendError(response *pb.CommandSendError) + // ConnectionClosed close the TCP connection. ConnectionClosed() } @@ -589,6 +592,7 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl c.handleSendReceipt(cmd.GetSendReceipt()) case pb.BaseCommand_SEND_ERROR: + c.handleSendError(cmd.GetSendError()) case pb.BaseCommand_MESSAGE: c.handleMessage(cmd.GetMessage(), headersAndPayload) @@ -698,6 +702,20 @@ func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) { } } +func (c *connection) handleSendError(response *pb.CommandSendError) { + producerID := response.GetProducerId() + + c.Lock() + producer, ok := c.listeners[producerID] + c.Unlock() + + if ok { + producer.ReceivedSendError(response) + } else { + c.log.WithField("producerID", producerID).Warn("Got unexpected send error") + } +} + func (c *connection) handleMessage(response *pb.CommandMessage, payload Buffer) { c.log.Debug("Got Message: ", response) consumerID := response.GetConsumerId() diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 8301765..7cf5032 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -680,6 +680,36 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer p.eventsChan <- sr } +func (p *partitionProducer) ReceivedSendError(response *pb.CommandSendError) { + pendingItems := p.pendingQueue.ReadableSlice() + if len(pendingItems) > 0 { + for _, pi := range pendingItems { + if pi.(*pendingItem).sequenceID != response.GetSequenceId() { + p.cnx.Close() + return + } + // lock the pending item while sending the requests + pi.(*pendingItem).Lock() + defer pi.(*pendingItem).Unlock() + + for _, i := range pi.(*pendingItem).sendRequests { + sr := i.(*sendRequest) + if sr.callback != nil { + if response.GetError() == 0 { + sr.callback(nil, sr.msg, errors.New("delay time is too long")) + } else { + sr.callback(nil, sr.msg, errors.New("send error")) + } + } + } + } + } else { + p.log.Warnf("the pending queue is empty, closing connection") + p.cnx.Close() + return + } +} + func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) { pi, ok := p.pendingQueue.Peek().(*pendingItem)