From 13c7f55adb1c49cbb6306ca7c88a327e82ac6a7e Mon Sep 17 00:00:00 2001 From: zeripath Date: Sat, 29 Jan 2022 11:37:08 +0000 Subject: [PATCH] In disk_channel queues synchronously push to disk on shutdown (#18415) Partial Backport of #18415 Instead of using an asynchronous goroutine to push to disk on shutdown just close the datachan and immediately push to the disk. Prevents messages of incompletely flushed queues. Signed-off-by: Andrew Thornton --- modules/queue/queue_bytefifo.go | 8 +++++--- modules/queue/queue_disk_channel.go | 2 +- modules/queue/queue_disk_channel_test.go | 1 - modules/queue/unique_queue_disk_channel.go | 13 ++++++------- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index edde47a62d627..c4d5d20a896ac 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -195,9 +195,11 @@ loop: } } -var errQueueEmpty = fmt.Errorf("empty queue") -var errEmptyBytes = fmt.Errorf("empty bytes") -var errUnmarshal = fmt.Errorf("failed to unmarshal") +var ( + errQueueEmpty = fmt.Errorf("empty queue") + errEmptyBytes = fmt.Errorf("empty bytes") + errUnmarshal = fmt.Errorf("failed to unmarshal") +) func (q *ByteFIFOQueue) doPop() error { q.lock.Lock() diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 72f330670a3dc..199f958bc3775 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -251,8 +251,8 @@ func (q *PersistableChannelQueue) Shutdown() { q.channelQueue.Wait() q.internal.(*LevelQueue).Wait() // Redirect all remaining data in the chan to the internal channel - close(q.channelQueue.dataChan) log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) + close(q.channelQueue.dataChan) for data := range q.channelQueue.dataChan { _ = q.internal.Push(data) atomic.AddInt64(&q.channelQueue.numInQueue, -1) diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index c90d715a73cf2..db12d9575c87a 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -188,5 +188,4 @@ func TestPersistableChannelQueue(t *testing.T) { for _, callback := range callbacks { callback() } - } diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index af42c0913d4da..975421a339395 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -238,13 +238,12 @@ func (q *PersistableChannelUniqueQueue) Shutdown() { q.channelQueue.Wait() q.internal.(*LevelUniqueQueue).Wait() // Redirect all remaining data in the chan to the internal channel - go func() { - log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) - for data := range q.channelQueue.dataChan { - _ = q.internal.Push(data) - } - log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) - }() + close(q.channelQueue.dataChan) + log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.channelQueue.dataChan { + _ = q.internal.Push(data) + } + log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name) }