Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter] internal/persistent_queue::OnProcessingFinished is changed to a class function instead of a callback #11338

Merged
merged 4 commits into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions exporter/internal/queue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
return true
}

// Should be called to remove the item of the given index from the queue once processing is finished.
// For in memory queue, this function is noop.
func (q *boundedMemoryQueue[T]) OnProcessingFinished(uint64, error) {

Check warning on line 58 in exporter/internal/queue/bounded_memory_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/bounded_memory_queue.go#L58

Added line #L58 was not covered by tests
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
q.sizedChannel.shutdown()
Expand Down
75 changes: 40 additions & 35 deletions exporter/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,15 @@
func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
for {
var (
req T
onProcessingFinished func(error)
consumed bool
index uint64
req T
consumed bool
)

// If we are stopped we still process all the other events in the channel before, but we
// return fast in the `getNextItem`, so we will free the channel fast and get to the stop.
_, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 {
req, onProcessingFinished, consumed = pq.getNextItem(context.Background())
index, req, consumed = pq.getNextItem(context.Background())
if !consumed {
return 0
}
Expand All @@ -213,7 +213,8 @@
return false
}
if consumed {
onProcessingFinished(consumeFunc(context.Background(), req))
consumeErr := consumeFunc(context.Background(), req)
pq.OnProcessingFinished(index, consumeErr)
return true
}
}
Expand Down Expand Up @@ -305,18 +306,18 @@

// getNextItem pulls the next available item from the persistent storage along with a callback function that should be
// called after the item is processed to clean up the storage. If no new item is available, returns false.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), bool) {
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) {
pq.mu.Lock()
defer pq.mu.Unlock()

var request T

if pq.stopped {
return request, nil, false
return 0, request, false
}

if pq.readIndex == pq.writeIndex {
return request, nil, false
return 0, request, false

Check warning on line 320 in exporter/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/persistent_queue.go#L320

Added line #L320 was not covered by tests
}

index := pq.readIndex
Expand All @@ -340,45 +341,49 @@
pq.logger.Error("Error deleting item from queue", zap.Error(err))
}

return request, nil, false
return index, request, false
sfc-gh-sili marked this conversation as resolved.
Show resolved Hide resolved
}

// Increase the reference count, so the client is not closed while the request is being processed.
// The client cannot be closed because we hold the lock since last we checked `stopped`.
pq.refClient++
return request, func(consumeErr error) {
// Delete the item from the persistent storage after it was processed.
pq.mu.Lock()
// Always unref client even if the consumer is shutdown because we always ref it for every valid request.
defer func() {
if err = pq.unrefClient(ctx); err != nil {
pq.logger.Error("Error closing the storage client", zap.Error(err))
}
pq.mu.Unlock()
}()

if experr.IsShutdownErr(consumeErr) {
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
// TODO: Handle partially delivered requests by updating their values in the storage.
return
}
return index, request, true
}

if err = pq.itemDispatchingFinish(ctx, index); err != nil {
pq.logger.Error("Error deleting item from queue", zap.Error(err))
// Should be called to remove the item of the given index from the queue once processing is finished.
func (pq *persistentQueue[T]) OnProcessingFinished(index uint64, consumeErr error) {
// Delete the item from the persistent storage after it was processed.
pq.mu.Lock()
// Always unref client even if the consumer is shutdown because we always ref it for every valid request.
defer func() {
if err := pq.unrefClient(context.Background()); err != nil {
pq.logger.Error("Error closing the storage client", zap.Error(err))

Check warning on line 361 in exporter/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/persistent_queue.go#L361

Added line #L361 was not covered by tests
}
pq.mu.Unlock()
}()

// Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
if (pq.readIndex % 10) == 0 {
if qsErr := pq.backupQueueSize(ctx); qsErr != nil {
pq.logger.Error("Error writing queue size to storage", zap.Error(err))
}
if experr.IsShutdownErr(consumeErr) {
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
// TODO: Handle partially delivered requests by updating their values in the storage.
return
}

if err := pq.itemDispatchingFinish(context.Background(), index); err != nil {
pq.logger.Error("Error deleting item from queue", zap.Error(err))

Check warning on line 373 in exporter/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/persistent_queue.go#L373

Added line #L373 was not covered by tests
}

// Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
if (pq.readIndex % 10) == 0 {
if qsErr := pq.backupQueueSize(context.Background()); qsErr != nil {
pq.logger.Error("Error writing queue size to storage", zap.Error(qsErr))

Check warning on line 380 in exporter/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/persistent_queue.go#L380

Added line #L380 was not covered by tests
}
}

// Ensure the used size and the channel size are in sync.
pq.sizedChannel.syncSize()
// Ensure the used size and the channel size are in sync.
pq.sizedChannel.syncSize()

}, true
}

// retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage
Expand Down
10 changes: 5 additions & 5 deletions exporter/internal/queue/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,19 +463,19 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) {
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{})

// Takes index 0 in process.
readReq, _, found := ps.getNextItem(context.Background())
_, readReq, found := ps.getNextItem(context.Background())
require.True(t, found)
assert.Equal(t, req, readReq)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0})

// This takes item 1 to process.
secondReadReq, onProcessingFinished, found := ps.getNextItem(context.Background())
secondIndex, secondReadReq, found := ps.getNextItem(context.Background())
require.True(t, found)
assert.Equal(t, req, secondReadReq)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0, 1})

// Lets mark item 1 as finished, it will remove it from the currently dispatched items list.
onProcessingFinished(nil)
ps.OnProcessingFinished(secondIndex, nil)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0})

// Reload the storage. Since items 0 was not finished, this should be re-enqueued at the end.
Expand Down Expand Up @@ -736,12 +736,12 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) {

require.NoError(t, ps.Offer(context.Background(), newTracesRequest(5, 10)))

_, onProcessingFinished, ok := ps.getNextItem(context.Background())
index, _, ok := ps.getNextItem(context.Background())
require.True(t, ok)
assert.False(t, ps.client.(*mockStorageClient).isClosed())
require.NoError(t, ps.Shutdown(context.Background()))
assert.False(t, ps.client.(*mockStorageClient).isClosed())
onProcessingFinished(nil)
ps.OnProcessingFinished(index, nil)
assert.True(t, ps.client.(*mockStorageClient).isClosed())
}

Expand Down
2 changes: 2 additions & 0 deletions exporter/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Queue[T any] interface {
Size() int
// Capacity returns the capacity of the queue.
Capacity() int
// Should be called to remove the item of the given index from the queue once processing is finished.
OnProcessingFinished(index uint64, consumeErr error)
}

type itemsCounter interface {
Expand Down
Loading