Skip to content

Commit

Permalink
feat: remove async handling code (#197)
Browse files Browse the repository at this point in the history
* feat: remove async handling code

* lint: remove unused code
  • Loading branch information
kruskall committed Jan 2, 2024
1 parent dca9298 commit 343d74e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 241 deletions.
81 changes: 11 additions & 70 deletions input/elasticapm/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ var (

errEmptyBody = errors.New("empty body")

// ErrQueueFull may be returned by HandleStream when the internal
// queue is full.
ErrQueueFull = errors.New("queue is full")

batchPool sync.Pool
)

Expand Down Expand Up @@ -243,7 +239,6 @@ func (p *Processor) readBatch(
// Callers must not access result concurrently with HandleStream.
func (p *Processor) HandleStream(
ctx context.Context,
async bool,
baseEvent *modelpb.APMEvent,
reader io.Reader,
batchSize int,
Expand All @@ -256,25 +251,15 @@ func (p *Processor) HandleStream(
//
// The semaphore defaults to 200 (N), only allowing N requests to read
// an cache Y events (determined by batchSize) from the batch.
//
// Clients can set async to true which makes the processor process the
// events in the background. Returns with an error `ErrQueueFull`
// if the semaphore is full. When asynchronous processing is requested,
// the batches are decoded synchronously, but the batch is processed
// asynchronously.
if err := p.semAcquire(ctx, async); err != nil {
if err := p.semAcquire(ctx); err != nil {
return fmt.Errorf("cannot acquire semaphore: %w", err)
}
sr := p.getStreamReader(reader)

// Release the semaphore on early exit; this will be set to false
// for asynchronous requests once we may no longer exit early.
shouldReleaseSemaphore := true
// Release the semaphore on early exit
defer func() {
sr.release()
if shouldReleaseSemaphore {
p.sem.Release(1)
}
p.sem.Release(1)
}()

// The first item is the metadata object.
Expand All @@ -292,80 +277,42 @@ func (p *Processor) HandleStream(
}
}

if async {
// The semaphore is released by handleStream
shouldReleaseSemaphore = false
}
first := true
for {
err := p.handleStream(ctx, async, baseEvent, batchSize, sr, processor, result, first)
err := p.handleStream(ctx, baseEvent, batchSize, sr, processor, result)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return fmt.Errorf("cannot handle stream: %w", err)
}
if first {
first = false
}
}
}

func (p *Processor) handleStream(
ctx context.Context,
async bool,
baseEvent *modelpb.APMEvent,
batchSize int,
sr *streamReader,
processor modelpb.BatchProcessor,
result *Result,
first bool,
) (readErr error) {
// Async requests will re-aquire the semaphore if it has more events than
// `batchSize`. In that event, the semaphore will be acquired again. If
// the semaphore is full, `ErrQueueFull` is returned.
) error {
// The first iteration will not acquire the semaphore since it's already
// acquired in the caller function.
var n int
if async {
if !first {
if err := p.semAcquire(ctx, async); err != nil {
return fmt.Errorf("cannot re-acquire semaphore: %w", err)
}
}
defer func() {
// If no events have been read on an asynchronous request, release
// the semaphore since the processing goroutine isn't scheduled.
if n == 0 {
p.sem.Release(1)
}
}()
}
var batch modelpb.Batch
if b, ok := batchPool.Get().(*modelpb.Batch); ok {
batch = (*b)[:0]
}
n, readErr = p.readBatch(ctx, baseEvent, batchSize, &batch, sr, result)
n, readErr := p.readBatch(ctx, baseEvent, batchSize, &batch, sr, result)
if n == 0 {
// No events to process, return the batch to the pool.
batchPool.Put(&batch)
return readErr
}
// Async requests are processed in the background and once the batch has
// been processed, the semaphore is released.
if async {
go func() {
defer p.sem.Release(1)
if err := p.processBatch(ctx, processor, &batch); err != nil {
p.logger.Error("failed handling async request", zap.Error(err))
}
}()
} else {
if err := p.processBatch(ctx, processor, &batch); err != nil {
return fmt.Errorf("cannot process batch: %w", err)
}
result.Accepted += n

if err := p.processBatch(ctx, processor, &batch); err != nil {
return fmt.Errorf("cannot process batch: %w", err)
}
result.Accepted += n
return readErr
}

Expand All @@ -392,16 +339,10 @@ func (p *Processor) getStreamReader(r io.Reader) *streamReader {
}
}

func (p *Processor) semAcquire(ctx context.Context, async bool) error {
func (p *Processor) semAcquire(ctx context.Context) error {
sp, ctx := apm.StartSpan(ctx, "Semaphore.Acquire", "Reporter")
defer sp.End()

if async {
if ok := p.sem.TryAcquire(1); !ok {
return ErrQueueFull
}
return nil
}
return p.sem.Acquire(ctx, 1)
}

Expand Down
Loading

0 comments on commit 343d74e

Please sign in to comment.