Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter] internal/persistent_queue::OnProcessingFinished is changed to a class function instead of a callback #11338

Merged
merged 4 commits into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions exporter/internal/queue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
return true
}

// Should be called to remove the item of the given index from the queue once processing is finished.
// For in memory queue, this function is noop.
func (q *boundedMemoryQueue[T]) OnProcessingFinished(uint64, error) {

Check warning on line 58 in exporter/internal/queue/bounded_memory_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/bounded_memory_queue.go#L58

Added line #L58 was not covered by tests
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
q.sizedChannel.shutdown()
Expand Down
80 changes: 43 additions & 37 deletions exporter/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,15 @@
func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
for {
var (
req T
onProcessingFinished func(error)
consumed bool
index uint64
req T
consumed bool
)

// If we are stopped we still process all the other events in the channel before, but we
// return fast in the `getNextItem`, so we will free the channel fast and get to the stop.
_, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 {
req, onProcessingFinished, consumed = pq.getNextItem(context.Background())
index, req, consumed = pq.getNextItem(context.Background())
if !consumed {
return 0
}
Expand All @@ -213,7 +213,8 @@
return false
}
if consumed {
onProcessingFinished(consumeFunc(context.Background(), req))
consumeErr := consumeFunc(context.Background(), req)
pq.OnProcessingFinished(index, consumeErr)
return true
}
}
Expand Down Expand Up @@ -303,20 +304,21 @@
return nil
}

// getNextItem pulls the next available item from the persistent storage along with a callback function that should be
// called after the item is processed to clean up the storage. If no new item is available, returns false.
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), bool) {
// getNextItem pulls the next available item from the persistent storage along with its index. Once processing is
// finished, the index should be called with OnProcessingFinished to clean up the storage. If no new item is available,
// returns false.
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) {
pq.mu.Lock()
defer pq.mu.Unlock()

var request T

if pq.stopped {
return request, nil, false
return 0, request, false
}

if pq.readIndex == pq.writeIndex {
return request, nil, false
return 0, request, false

Check warning on line 321 in exporter/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/persistent_queue.go#L321

Added line #L321 was not covered by tests
}

index := pq.readIndex
Expand All @@ -340,45 +342,49 @@
pq.logger.Error("Error deleting item from queue", zap.Error(err))
}

return request, nil, false
return 0, request, false
}

// Increase the reference count, so the client is not closed while the request is being processed.
// The client cannot be closed because we hold the lock since last we checked `stopped`.
pq.refClient++
return request, func(consumeErr error) {
// Delete the item from the persistent storage after it was processed.
pq.mu.Lock()
// Always unref client even if the consumer is shutdown because we always ref it for every valid request.
defer func() {
if err = pq.unrefClient(ctx); err != nil {
pq.logger.Error("Error closing the storage client", zap.Error(err))
}
pq.mu.Unlock()
}()

if experr.IsShutdownErr(consumeErr) {
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
// TODO: Handle partially delivered requests by updating their values in the storage.
return
}
return index, request, true
}

if err = pq.itemDispatchingFinish(ctx, index); err != nil {
pq.logger.Error("Error deleting item from queue", zap.Error(err))
// Should be called to remove the item of the given index from the queue once processing is finished.
func (pq *persistentQueue[T]) OnProcessingFinished(index uint64, consumeErr error) {
// Delete the item from the persistent storage after it was processed.
pq.mu.Lock()
// Always unref client even if the consumer is shutdown because we always ref it for every valid request.
defer func() {
if err := pq.unrefClient(context.Background()); err != nil {
pq.logger.Error("Error closing the storage client", zap.Error(err))

Check warning on line 362 in exporter/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/persistent_queue.go#L362

Added line #L362 was not covered by tests
}
pq.mu.Unlock()
}()

// Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
if (pq.readIndex % 10) == 0 {
if qsErr := pq.backupQueueSize(ctx); qsErr != nil {
pq.logger.Error("Error writing queue size to storage", zap.Error(err))
}
if experr.IsShutdownErr(consumeErr) {
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
// TODO: Handle partially delivered requests by updating their values in the storage.
return
}

if err := pq.itemDispatchingFinish(context.Background(), index); err != nil {
pq.logger.Error("Error deleting item from queue", zap.Error(err))

Check warning on line 374 in exporter/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/persistent_queue.go#L374

Added line #L374 was not covered by tests
}

// Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
if (pq.readIndex % 10) == 0 {
if qsErr := pq.backupQueueSize(context.Background()); qsErr != nil {
pq.logger.Error("Error writing queue size to storage", zap.Error(qsErr))

Check warning on line 381 in exporter/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/persistent_queue.go#L381

Added line #L381 was not covered by tests
}
}

// Ensure the used size and the channel size are in sync.
pq.sizedChannel.syncSize()
// Ensure the used size and the channel size are in sync.
pq.sizedChannel.syncSize()

}, true
}

// retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage
Expand Down
10 changes: 5 additions & 5 deletions exporter/internal/queue/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,19 +463,19 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) {
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{})

// Takes index 0 in process.
readReq, _, found := ps.getNextItem(context.Background())
_, readReq, found := ps.getNextItem(context.Background())
require.True(t, found)
assert.Equal(t, req, readReq)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0})

// This takes item 1 to process.
secondReadReq, onProcessingFinished, found := ps.getNextItem(context.Background())
secondIndex, secondReadReq, found := ps.getNextItem(context.Background())
require.True(t, found)
assert.Equal(t, req, secondReadReq)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0, 1})

// Lets mark item 1 as finished, it will remove it from the currently dispatched items list.
onProcessingFinished(nil)
ps.OnProcessingFinished(secondIndex, nil)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0})

// Reload the storage. Since items 0 was not finished, this should be re-enqueued at the end.
Expand Down Expand Up @@ -736,12 +736,12 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) {

require.NoError(t, ps.Offer(context.Background(), newTracesRequest(5, 10)))

_, onProcessingFinished, ok := ps.getNextItem(context.Background())
index, _, ok := ps.getNextItem(context.Background())
require.True(t, ok)
assert.False(t, ps.client.(*mockStorageClient).isClosed())
require.NoError(t, ps.Shutdown(context.Background()))
assert.False(t, ps.client.(*mockStorageClient).isClosed())
onProcessingFinished(nil)
ps.OnProcessingFinished(index, nil)
assert.True(t, ps.client.(*mockStorageClient).isClosed())
}

Expand Down
2 changes: 2 additions & 0 deletions exporter/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Queue[T any] interface {
Size() int
// Capacity returns the capacity of the queue.
Capacity() int
// Should be called to remove the item of the given index from the queue once processing is finished.
OnProcessingFinished(index uint64, consumeErr error)
}

type itemsCounter interface {
Expand Down
Loading