From 343d74e37bcca25334a0423799427a25c23d5d17 Mon Sep 17 00:00:00 2001 From: kruskall <99559985+kruskall@users.noreply.github.com> Date: Tue, 2 Jan 2024 12:32:43 +0100 Subject: [PATCH] feat: remove async handling code (#197) * feat: remove async handling code * lint: remove unused code --- input/elasticapm/processor.go | 81 ++----------- input/elasticapm/processor_test.go | 178 ++--------------------------- 2 files changed, 18 insertions(+), 241 deletions(-) diff --git a/input/elasticapm/processor.go b/input/elasticapm/processor.go index 807b4528..5d439828 100644 --- a/input/elasticapm/processor.go +++ b/input/elasticapm/processor.go @@ -41,10 +41,6 @@ var ( errEmptyBody = errors.New("empty body") - // ErrQueueFull may be returned by HandleStream when the internal - // queue is full. - ErrQueueFull = errors.New("queue is full") - batchPool sync.Pool ) @@ -243,7 +239,6 @@ func (p *Processor) readBatch( // Callers must not access result concurrently with HandleStream. func (p *Processor) HandleStream( ctx context.Context, - async bool, baseEvent *modelpb.APMEvent, reader io.Reader, batchSize int, @@ -256,25 +251,15 @@ func (p *Processor) HandleStream( // // The semaphore defaults to 200 (N), only allowing N requests to read // an cache Y events (determined by batchSize) from the batch. - // - // Clients can set async to true which makes the processor process the - // events in the background. Returns with an error `ErrQueueFull` - // if the semaphore is full. When asynchronous processing is requested, - // the batches are decoded synchronously, but the batch is processed - // asynchronously. - if err := p.semAcquire(ctx, async); err != nil { + if err := p.semAcquire(ctx); err != nil { return fmt.Errorf("cannot acquire semaphore: %w", err) } sr := p.getStreamReader(reader) - // Release the semaphore on early exit; this will be set to false - // for asynchronous requests once we may no longer exit early. - shouldReleaseSemaphore := true + // Release the semaphore on early exit defer func() { sr.release() - if shouldReleaseSemaphore { - p.sem.Release(1) - } + p.sem.Release(1) }() // The first item is the metadata object. @@ -292,80 +277,42 @@ func (p *Processor) HandleStream( } } - if async { - // The semaphore is released by handleStream - shouldReleaseSemaphore = false - } - first := true for { - err := p.handleStream(ctx, async, baseEvent, batchSize, sr, processor, result, first) + err := p.handleStream(ctx, baseEvent, batchSize, sr, processor, result) if err != nil { if errors.Is(err, io.EOF) { return nil } return fmt.Errorf("cannot handle stream: %w", err) } - if first { - first = false - } } } func (p *Processor) handleStream( ctx context.Context, - async bool, baseEvent *modelpb.APMEvent, batchSize int, sr *streamReader, processor modelpb.BatchProcessor, result *Result, - first bool, -) (readErr error) { - // Async requests will re-aquire the semaphore if it has more events than - // `batchSize`. In that event, the semaphore will be acquired again. If - // the semaphore is full, `ErrQueueFull` is returned. +) error { // The first iteration will not acquire the semaphore since it's already // acquired in the caller function. - var n int - if async { - if !first { - if err := p.semAcquire(ctx, async); err != nil { - return fmt.Errorf("cannot re-acquire semaphore: %w", err) - } - } - defer func() { - // If no events have been read on an asynchronous request, release - // the semaphore since the processing goroutine isn't scheduled. - if n == 0 { - p.sem.Release(1) - } - }() - } var batch modelpb.Batch if b, ok := batchPool.Get().(*modelpb.Batch); ok { batch = (*b)[:0] } - n, readErr = p.readBatch(ctx, baseEvent, batchSize, &batch, sr, result) + n, readErr := p.readBatch(ctx, baseEvent, batchSize, &batch, sr, result) if n == 0 { // No events to process, return the batch to the pool. batchPool.Put(&batch) return readErr } - // Async requests are processed in the background and once the batch has - // been processed, the semaphore is released. - if async { - go func() { - defer p.sem.Release(1) - if err := p.processBatch(ctx, processor, &batch); err != nil { - p.logger.Error("failed handling async request", zap.Error(err)) - } - }() - } else { - if err := p.processBatch(ctx, processor, &batch); err != nil { - return fmt.Errorf("cannot process batch: %w", err) - } - result.Accepted += n + + if err := p.processBatch(ctx, processor, &batch); err != nil { + return fmt.Errorf("cannot process batch: %w", err) } + result.Accepted += n return readErr } @@ -392,16 +339,10 @@ func (p *Processor) getStreamReader(r io.Reader) *streamReader { } } -func (p *Processor) semAcquire(ctx context.Context, async bool) error { +func (p *Processor) semAcquire(ctx context.Context) error { sp, ctx := apm.StartSpan(ctx, "Semaphore.Acquire", "Reporter") defer sp.End() - if async { - if ok := p.sem.TryAcquire(1); !ok { - return ErrQueueFull - } - return nil - } return p.sem.Acquire(ctx, 1) } diff --git a/input/elasticapm/processor_test.go b/input/elasticapm/processor_test.go index 61851f73..3dcea907 100644 --- a/input/elasticapm/processor_test.go +++ b/input/elasticapm/processor_test.go @@ -23,8 +23,6 @@ import ( "errors" "fmt" "strings" - "sync" - "sync/atomic" "testing" "time" @@ -58,7 +56,7 @@ func TestHandleStreamReaderError(t *testing.T) { var actualResult Result err := sp.HandleStream( - context.Background(), false, &modelpb.APMEvent{}, + context.Background(), &modelpb.APMEvent{}, reader, 10, nopBatchProcessor{}, &actualResult, ) assert.ErrorIs(t, err, readErr) @@ -79,9 +77,6 @@ func TestHandleStreamBatchProcessorError(t *testing.T) { }{{ name: "NotQueueFull", err: errors.New("queue is not full, something else is wrong"), - }, { - name: "QueueFull", - err: ErrQueueFull, }} { sp := NewProcessor(Config{ MaxEventSize: 100 * 1024, @@ -93,7 +88,7 @@ func TestHandleStreamBatchProcessorError(t *testing.T) { var actualResult Result err := sp.HandleStream( - context.Background(), false, &modelpb.APMEvent{}, + context.Background(), &modelpb.APMEvent{}, strings.NewReader(payload), 10, processor, &actualResult, ) assert.ErrorIs(t, err, test.err) @@ -191,7 +186,7 @@ func TestHandleStreamErrors(t *testing.T) { Semaphore: semaphore.NewWeighted(1), }) err := p.HandleStream( - context.Background(), false, &modelpb.APMEvent{}, + context.Background(), &modelpb.APMEvent{}, strings.NewReader(test.payload), 10, nopBatchProcessor{}, &actualResult, ) @@ -226,7 +221,7 @@ func TestHandleStream(t *testing.T) { Semaphore: semaphore.NewWeighted(1), }) err := p.HandleStream( - context.Background(), false, &modelpb.APMEvent{}, + context.Background(), &modelpb.APMEvent{}, strings.NewReader(payload), 10, batchProcessor, &Result{}, ) @@ -265,7 +260,7 @@ func TestHandleStreamRUMv3(t *testing.T) { }) var result Result err := p.HandleStream( - context.Background(), false, &modelpb.APMEvent{}, + context.Background(), &modelpb.APMEvent{}, strings.NewReader(payload), 10, batchProcessor, &result, ) @@ -316,7 +311,7 @@ func TestHandleStreamBaseEvent(t *testing.T) { Semaphore: semaphore.NewWeighted(1), }) err := p.HandleStream( - context.Background(), false, &baseEvent, + context.Background(), &baseEvent, strings.NewReader(payload), 10, batchProcessor, &Result{}, ) @@ -353,7 +348,7 @@ func TestLabelLeak(t *testing.T) { Semaphore: semaphore.NewWeighted(1), }) var actualResult Result - err := p.HandleStream(context.Background(), false, baseEvent, strings.NewReader(payload), 10, batchProcessor, &actualResult) + err := p.HandleStream(context.Background(), baseEvent, strings.NewReader(payload), 10, batchProcessor, &actualResult) require.NoError(t, err) txs := processed @@ -374,167 +369,8 @@ func TestLabelLeak(t *testing.T) { assert.Equal(t, modelpb.Labels{"ci_commit": {Global: true, Value: "unknown"}}, modelpb.Labels(txs[1].Labels)) } -func TestConcurrentAsync(t *testing.T) { - smallBatch := validMetadata + "\n" + validTransaction + "\n" - bigBatch := validMetadata + "\n" + strings.Repeat(validTransaction+"\n", 2000) - - type testCase struct { - payload string - sem int64 - requests int - fullSem bool - } - - test := func(tc testCase) (pResult Result) { - var wg sync.WaitGroup - var mu sync.Mutex - p := NewProcessor(Config{ - MaxEventSize: 100 * 1024, - Semaphore: semaphore.NewWeighted(tc.sem), - }) - if tc.fullSem { - for i := int64(0); i < tc.sem; i++ { - p.semAcquire(context.Background(), false) - } - } - handleStream := func(ctx context.Context, bp *accountProcessor) { - wg.Add(1) - go func() { - defer wg.Done() - var result Result - base := &modelpb.APMEvent{ - Host: &modelpb.Host{ - Ip: []*modelpb.IP{ - modelpb.MustParseIP("192.0.0.1"), - }, - }, - } - err := p.HandleStream(ctx, true, base, strings.NewReader(tc.payload), 10, bp, &result) - if err != nil { - result.addError(err) - } - if !tc.fullSem { - select { - case <-bp.batch: - case <-ctx.Done(): - } - } - mu.Lock() - if len(result.Errors) > 0 { - pResult.Errors = append(pResult.Errors, result.Errors...) - } - mu.Unlock() - }() - } - batchProcessor := &accountProcessor{batch: make(chan *modelpb.Batch, tc.requests)} - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() - for i := 0; i < tc.requests; i++ { - handleStream(ctx, batchProcessor) - } - wg.Wait() - if !tc.fullSem { - // Try to acquire the lock to make sure all the requests have been handled - // and the locks have been released. - for i := int64(0); i < tc.sem; i++ { - p.semAcquire(context.Background(), false) - } - } - processed := batchProcessor.processed.Load() - pResult.Accepted += int(processed) - return - } - - t.Run("semaphore_full", func(t *testing.T) { - res := test(testCase{ - sem: 2, - requests: 3, - fullSem: true, - payload: smallBatch, - }) - assert.Equal(t, 0, res.Accepted) - assert.Equal(t, 3, len(res.Errors)) - for _, err := range res.Errors { - assert.ErrorIs(t, err, ErrQueueFull) - } - }) - t.Run("semaphore_undersized", func(t *testing.T) { - res := test(testCase{ - sem: 2, - requests: 100, - payload: bigBatch, - }) - // When the semaphore is full, `ErrQueueFull` is returned. - assert.Greater(t, len(res.Errors), 0) - for _, err := range res.Errors { - assert.ErrorIs(t, err, ErrQueueFull) - } - }) - t.Run("semaphore_empty", func(t *testing.T) { - res := test(testCase{ - sem: 5, - requests: 5, - payload: smallBatch, - }) - assert.Equal(t, 5, res.Accepted) - assert.Equal(t, 0, len(res.Errors)) - - res = test(testCase{ - sem: 5, - requests: 5, - payload: bigBatch, - }) - assert.GreaterOrEqual(t, res.Accepted, 5) - // all the request will return with an error since only 50 events of - // each (5 requests * batch size) will be processed. - assert.Equal(t, 5, len(res.Errors)) - }) - t.Run("semaphore_empty_incorrect_metadata", func(t *testing.T) { - res := test(testCase{ - sem: 5, - requests: 5, - payload: `{"metadata": {"siervice":{}}}`, - }) - assert.Equal(t, 0, res.Accepted) - assert.Len(t, res.Errors, 5) - - incorrectEvent := `{"metadata": {"service": {"name": "testsvc", "environment": "staging", "version": null, "agent": {"name": "python", "version": "6.9.1"}, "language": {"name": "python", "version": "3.10.4"}, "runtime": {"name": "CPython", "version": "3.10.4"}, "framework": {"name": "flask", "version": "2.1.1"}}, "process": {"pid": 2112739, "ppid": 2112738, "argv": ["/home/stuart/workspace/sdh/581/venv/lib/python3.10/site-packages/flask/__main__.py", "run"], "title": null}, "system": {"hostname": "slaptop", "architecture": "x86_64", "platform": "linux"}, "labels": {"ci_commit": "unknown", "numeric": 1}}} -{"some_incorrect_event": {}}` - res = test(testCase{ - sem: 5, - requests: 2, - payload: incorrectEvent, - }) - assert.Equal(t, 0, res.Accepted) - assert.Len(t, res.Errors, 2) - }) -} - type nopBatchProcessor struct{} func (nopBatchProcessor) ProcessBatch(context.Context, *modelpb.Batch) error { return nil } - -type accountProcessor struct { - batch chan *modelpb.Batch - processed atomic.Uint64 -} - -func (p *accountProcessor) ProcessBatch(ctx context.Context, b *modelpb.Batch) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - if p.batch != nil { - events := b.Clone() - select { - case p.batch <- &events: - case <-ctx.Done(): - return ctx.Err() - } - } - p.processed.Add(1) - return nil -}