Skip to content

Commit

Permalink
add code for send error situation
Browse files Browse the repository at this point in the history
  • Loading branch information
kyrietu committed Feb 6, 2021
1 parent d7760b5 commit 36c9ca0
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
18 changes: 18 additions & 0 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ type ConnectionListener interface {
// ReceivedSendReceipt receive and process the return value of the send command.
ReceivedSendReceipt(response *pb.CommandSendReceipt)

//receive and process the return value of the sendError command.
ReceivedSendError(response *pb.CommandSendError)

// ConnectionClosed close the TCP connection.
ConnectionClosed()
}
Expand Down Expand Up @@ -589,6 +592,7 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl
c.handleSendReceipt(cmd.GetSendReceipt())

case pb.BaseCommand_SEND_ERROR:
c.handleSendError(cmd.GetSendError())

case pb.BaseCommand_MESSAGE:
c.handleMessage(cmd.GetMessage(), headersAndPayload)
Expand Down Expand Up @@ -698,6 +702,20 @@ func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
}
}

func (c *connection) handleSendError(response *pb.CommandSendError) {
producerID := response.GetProducerId()

c.Lock()
producer, ok := c.listeners[producerID]
c.Unlock()

if ok {
producer.ReceivedSendError(response)
} else {
c.log.WithField("producerID", producerID).Warn("Got unexpected send error")
}
}

func (c *connection) handleMessage(response *pb.CommandMessage, payload Buffer) {
c.log.Debug("Got Message: ", response)
consumerID := response.GetConsumerId()
Expand Down
30 changes: 30 additions & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,36 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
p.eventsChan <- sr
}

func (p *partitionProducer) ReceivedSendError(response *pb.CommandSendError) {
pendingItems := p.pendingQueue.ReadableSlice()
if len(pendingItems) > 0 {
for _, pi := range pendingItems {
if pi.(*pendingItem).sequenceID != response.GetSequenceId() {
p.cnx.Close()
return
}
// lock the pending item while sending the requests
pi.(*pendingItem).Lock()
defer pi.(*pendingItem).Unlock()

for _, i := range pi.(*pendingItem).sendRequests {
sr := i.(*sendRequest)
if sr.callback != nil {
if response.GetError() == 0 {
sr.callback(nil, sr.msg, errors.New("delay time is too long"))
} else {
sr.callback(nil, sr.msg, errors.New("send error"))
}
}
}
}
} else {
p.log.Warnf("the pending queue is empty, closing connection")
p.cnx.Close()
return
}
}

func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
pi, ok := p.pendingQueue.Peek().(*pendingItem)

Expand Down

0 comments on commit 36c9ca0

Please sign in to comment.