Skip to content

Commit

Permalink
[Filebeat] Add check for context.DeadlineExceeded error (#21732) (#21771
Browse files Browse the repository at this point in the history
)

(cherry picked from commit 7addb4d)
  • Loading branch information
kaiyan-sheng authored Oct 14, 2020
1 parent 971214b commit e6315f8
Showing 1 changed file with 15 additions and 19 deletions.
34 changes: 15 additions & 19 deletions x-pack/filebeat/input/s3/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ func (c *s3Collector) processMessage(svcS3 s3iface.ClientAPI, message sqs.Messag
// read from s3 object and create event for each log line
err = c.handleS3Objects(svcS3, s3Infos, errC)
if err != nil {
err = fmt.Errorf("handleS3Objects failed: %w", err)
c.logger.Error(err)
c.logger.Error(fmt.Errorf("handleS3Objects failed: %w", err))
return err
}
c.logger.Debugf("handleS3Objects succeed")
Expand All @@ -163,7 +162,12 @@ func (c *s3Collector) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.
return nil
case err := <-errC:
if err != nil {
c.logger.Warn("Processing message failed, updating visibility timeout")
if err == context.DeadlineExceeded {
c.logger.Info("Context deadline exceeded, updating visibility timeout")
} else {
c.logger.Warnf("Processing message failed '%w', updating visibility timeout", err)
}

err := c.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle)
if err != nil {
c.logger.Error(fmt.Errorf("SQS ChangeMessageVisibilityRequest failed: %w", err))
Expand Down Expand Up @@ -298,8 +302,7 @@ func (c *s3Collector) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, e
c.logger.Debugf("Processing file from s3 bucket \"%s\" with name \"%s\"", info.name, info.key)
err := c.createEventsFromS3Info(svc, info, s3Ctx)
if err != nil {
err = fmt.Errorf("createEventsFromS3Info failed processing file from s3 bucket \"%s\" with name \"%s\": %w", info.name, info.key, err)
c.logger.Error(err)
c.logger.Error(fmt.Errorf("createEventsFromS3Info failed processing file from s3 bucket \"%s\" with name \"%s\": %w", info.name, info.key, err))
s3Ctx.setError(err)
}
}
Expand All @@ -326,8 +329,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
// If the SDK can determine the request or retry delay was canceled
// by a context the ErrCodeRequestCanceled error will be returned.
if awsErr.Code() == awssdk.ErrCodeRequestCanceled {
err = fmt.Errorf("s3 GetObjectRequest canceled for '%s' from S3 bucket '%s': %w", info.key, info.name, err)
c.logger.Error(err)
c.logger.Error(fmt.Errorf("s3 GetObjectRequest canceled for '%s' from S3 bucket '%s': %w", info.key, info.name, err))
return err
}

Expand All @@ -345,16 +347,14 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,

isS3ObjGzipped, err := isStreamGzipped(reader)
if err != nil {
err = fmt.Errorf("could not determine if S3 object is gzipped: %w", err)
c.logger.Error(err)
c.logger.Error(fmt.Errorf("could not determine if S3 object is gzipped: %w", err))
return err
}

if isS3ObjGzipped {
gzipReader, err := gzip.NewReader(reader)
if err != nil {
err = fmt.Errorf("gzip.NewReader failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)
c.logger.Error(err)
c.logger.Error(fmt.Errorf("gzip.NewReader failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err))
return err
}
reader = bufio.NewReader(gzipReader)
Expand All @@ -366,8 +366,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
decoder := json.NewDecoder(reader)
err := c.decodeJSON(decoder, objectHash, info, s3Ctx)
if err != nil {
err = fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)
c.logger.Error(err)
c.logger.Error(fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err))
return err
}
return nil
Expand All @@ -383,14 +382,12 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
event := createEvent(log, offset, info, objectHash, s3Ctx)
err = c.forwardEvent(event)
if err != nil {
err = fmt.Errorf("forwardEvent failed: %w", err)
c.logger.Error(err)
c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err))
return err
}
return nil
} else if err != nil {
err = fmt.Errorf("readStringAndTrimDelimiter failed: %w", err)
c.logger.Error(err)
c.logger.Error(fmt.Errorf("readStringAndTrimDelimiter failed: %w", err))
return err
}

Expand All @@ -403,8 +400,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
event := createEvent(log, offset, info, objectHash, s3Ctx)
err = c.forwardEvent(event)
if err != nil {
err = fmt.Errorf("forwardEvent failed: %w", err)
c.logger.Error(err)
c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err))
return err
}
}
Expand Down

0 comments on commit e6315f8

Please sign in to comment.