From 9e44f6b11e5920a708647f774e2183fa6cded125 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Mon, 12 Oct 2020 20:17:06 -0600 Subject: [PATCH 1/3] Add check for context.DeadlineExceeded error --- x-pack/filebeat/input/s3/collector.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/s3/collector.go b/x-pack/filebeat/input/s3/collector.go index bf294f94245..d593298ae9d 100644 --- a/x-pack/filebeat/input/s3/collector.go +++ b/x-pack/filebeat/input/s3/collector.go @@ -163,7 +163,12 @@ func (c *s3Collector) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs. return nil case err := <-errC: if err != nil { - c.logger.Warn("Processing message failed, updating visibility timeout") + if err == context.DeadlineExceeded { + c.logger.Info("Context deadline exceeded, updating visibility timeout") + } else { + c.logger.Warn("Processing message failed, updating visibility timeout") + } + err := c.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle) if err != nil { c.logger.Error(fmt.Errorf("SQS ChangeMessageVisibilityRequest failed: %w", err)) From 7e9336d37062e1ac8c3e293275319a3b640aae3e Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Mon, 12 Oct 2020 20:23:24 -0600 Subject: [PATCH 2/3] expose error message in log --- x-pack/filebeat/input/s3/collector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/s3/collector.go b/x-pack/filebeat/input/s3/collector.go index d593298ae9d..875754a6e0d 100644 --- a/x-pack/filebeat/input/s3/collector.go +++ b/x-pack/filebeat/input/s3/collector.go @@ -166,7 +166,7 @@ func (c *s3Collector) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs. if err == context.DeadlineExceeded { c.logger.Info("Context deadline exceeded, updating visibility timeout") } else { - c.logger.Warn("Processing message failed, updating visibility timeout") + c.logger.Warnf("Processing message failed '%w', updating visibility timeout", err) } err := c.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle) From 61dbce91735e1853e6172f93f5dafb495119e3e8 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 13 Oct 2020 09:46:11 -0600 Subject: [PATCH 3/3] return original error message --- x-pack/filebeat/input/s3/collector.go | 27 +++++++++------------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/x-pack/filebeat/input/s3/collector.go b/x-pack/filebeat/input/s3/collector.go index 875754a6e0d..1b890513284 100644 --- a/x-pack/filebeat/input/s3/collector.go +++ b/x-pack/filebeat/input/s3/collector.go @@ -148,8 +148,7 @@ func (c *s3Collector) processMessage(svcS3 s3iface.ClientAPI, message sqs.Messag // read from s3 object and create event for each log line err = c.handleS3Objects(svcS3, s3Infos, errC) if err != nil { - err = fmt.Errorf("handleS3Objects failed: %w", err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("handleS3Objects failed: %w", err)) return err } c.logger.Debugf("handleS3Objects succeed") @@ -303,8 +302,7 @@ func (c *s3Collector) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, e c.logger.Debugf("Processing file from s3 bucket \"%s\" with name \"%s\"", info.name, info.key) err := c.createEventsFromS3Info(svc, info, s3Ctx) if err != nil { - err = fmt.Errorf("createEventsFromS3Info failed processing file from s3 bucket \"%s\" with name \"%s\": %w", info.name, info.key, err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("createEventsFromS3Info failed processing file from s3 bucket \"%s\" with name \"%s\": %w", info.name, info.key, err)) s3Ctx.setError(err) } } @@ -331,8 +329,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, // If the SDK can determine the request or retry delay was canceled // by a context the ErrCodeRequestCanceled error will be returned. if awsErr.Code() == awssdk.ErrCodeRequestCanceled { - err = fmt.Errorf("s3 GetObjectRequest canceled for '%s' from S3 bucket '%s': %w", info.key, info.name, err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("s3 GetObjectRequest canceled for '%s' from S3 bucket '%s': %w", info.key, info.name, err)) return err } @@ -350,16 +347,14 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, isS3ObjGzipped, err := isStreamGzipped(reader) if err != nil { - err = fmt.Errorf("could not determine if S3 object is gzipped: %w", err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("could not determine if S3 object is gzipped: %w", err)) return err } if isS3ObjGzipped { gzipReader, err := gzip.NewReader(reader) if err != nil { - err = fmt.Errorf("gzip.NewReader failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("gzip.NewReader failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)) return err } reader = bufio.NewReader(gzipReader) @@ -371,8 +366,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, decoder := json.NewDecoder(reader) err := c.decodeJSON(decoder, objectHash, info, s3Ctx) if err != nil { - err = fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)) return err } return nil @@ -388,14 +382,12 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, event := createEvent(log, offset, info, objectHash, s3Ctx) err = c.forwardEvent(event) if err != nil { - err = fmt.Errorf("forwardEvent failed: %w", err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err)) return err } return nil } else if err != nil { - err = fmt.Errorf("readStringAndTrimDelimiter failed: %w", err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("readStringAndTrimDelimiter failed: %w", err)) return err } @@ -408,8 +400,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, event := createEvent(log, offset, info, objectHash, s3Ctx) err = c.forwardEvent(event) if err != nil { - err = fmt.Errorf("forwardEvent failed: %w", err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err)) return err } }