From 47e4bd92b864f8f92d14710f0ff01a3e35568e85 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 10 Jul 2023 19:59:24 -0700 Subject: [PATCH] return context error if the context was canceled mid-way (#1852) fixes #1850 --- api-list.go | 53 ++++++++++++++++++++++++++++++++++++++---- api-put-object_test.go | 3 --- functional_tests.go | 12 ++++++---- 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/api-list.go b/api-list.go index cd2355ca3..3b50f61d3 100644 --- a/api-list.go +++ b/api-list.go @@ -97,7 +97,15 @@ func (c *Client) listObjectsV2(ctx context.Context, bucketName string, opts List // Initiate list objects goroutine here. go func(objectStatCh chan<- ObjectInfo) { - defer close(objectStatCh) + defer func() { + if contextCanceled(ctx) { + objectStatCh <- ObjectInfo{ + Err: ctx.Err(), + } + } + close(objectStatCh) + }() + // Save continuationToken for next request. var continuationToken string for { @@ -304,7 +312,14 @@ func (c *Client) listObjects(ctx context.Context, bucketName string, opts ListOb // Initiate list objects goroutine here. go func(objectStatCh chan<- ObjectInfo) { - defer close(objectStatCh) + defer func() { + if contextCanceled(ctx) { + objectStatCh <- ObjectInfo{ + Err: ctx.Err(), + } + } + close(objectStatCh) + }() marker := opts.StartAfter for { @@ -321,6 +336,7 @@ func (c *Client) listObjects(ctx context.Context, bucketName string, opts ListOb for _, object := range result.Contents { // Save the marker. marker = object.Key + object.ETag = trimEtag(object.ETag) select { // Send object content. case objectStatCh <- object: @@ -393,7 +409,14 @@ func (c *Client) listObjectVersions(ctx context.Context, bucketName string, opts // Initiate list objects goroutine here. go func(resultCh chan<- ObjectInfo) { - defer close(resultCh) + defer func() { + if contextCanceled(ctx) { + resultCh <- ObjectInfo{ + Err: ctx.Err(), + } + } + close(resultCh) + }() var ( keyMarker = "" @@ -699,6 +722,10 @@ func (o *ListObjectsOptions) Set(key, value string) { // for object := range api.ListObjects(ctx, "mytestbucket", minio.ListObjectsOptions{Prefix: "starthere", Recursive:true}) { // fmt.Println(object) // } +// +// If caller cancels the context, then the last entry on the 'chan ObjectInfo' will be the context.Error() +// caller must drain the channel entirely and wait until channel is closed before proceeding, without +// waiting on the channel to be closed completely you might leak goroutines. func (c *Client) ListObjects(ctx context.Context, bucketName string, opts ListObjectsOptions) <-chan ObjectInfo { if opts.WithVersions { return c.listObjectVersions(ctx, bucketName, opts) @@ -739,6 +766,16 @@ func (c *Client) ListIncompleteUploads(ctx context.Context, bucketName, objectPr return c.listIncompleteUploads(ctx, bucketName, objectPrefix, recursive) } +// contextCanceled returns whether a context is canceled. +func contextCanceled(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} + // listIncompleteUploads lists all incomplete uploads. func (c *Client) listIncompleteUploads(ctx context.Context, bucketName, objectPrefix string, recursive bool) <-chan ObjectMultipartInfo { // Allocate channel for multipart uploads. @@ -766,7 +803,15 @@ func (c *Client) listIncompleteUploads(ctx context.Context, bucketName, objectPr return objectMultipartStatCh } go func(objectMultipartStatCh chan<- ObjectMultipartInfo) { - defer close(objectMultipartStatCh) + defer func() { + if contextCanceled(ctx) { + objectMultipartStatCh <- ObjectMultipartInfo{ + Err: ctx.Err(), + } + } + close(objectMultipartStatCh) + }() + // object and upload ID marker for future requests. var objectMarker string var uploadIDMarker string diff --git a/api-put-object_test.go b/api-put-object_test.go index ca35b12ed..dde4777ee 100644 --- a/api-put-object_test.go +++ b/api-put-object_test.go @@ -82,7 +82,6 @@ func Test_SSEHeaders(t *testing.T) { c, err := New("s3.amazonaws.com", &Options{ Transport: rt, }) - if err != nil { t.Error(err) } @@ -147,7 +146,6 @@ func Test_SSEHeaders(t *testing.T) { uploadID: "upId", sse: opts.ServerSideEncryption, }) - if err != nil { t.Error(err) } @@ -167,5 +165,4 @@ func Test_SSEHeaders(t *testing.T) { } }) } - } diff --git a/functional_tests.go b/functional_tests.go index 2bc6b8645..3c83d6f64 100644 --- a/functional_tests.go +++ b/functional_tests.go @@ -2471,7 +2471,8 @@ func testTrailingChecksums() { PO minio.PutObjectOptions }{ // Currently there is no way to override the checksum type. - {header: "x-amz-checksum-crc32c", + { + header: "x-amz-checksum-crc32c", hasher: crc32.New(crc32.MakeTable(crc32.Castagnoli)), ChecksumCRC32C: "set", PO: minio.PutObjectOptions{ @@ -2481,7 +2482,8 @@ func testTrailingChecksums() { PartSize: 5 << 20, }, }, - {header: "x-amz-checksum-crc32c", + { + header: "x-amz-checksum-crc32c", hasher: crc32.New(crc32.MakeTable(crc32.Castagnoli)), ChecksumCRC32C: "set", PO: minio.PutObjectOptions{ @@ -2491,7 +2493,8 @@ func testTrailingChecksums() { PartSize: 6_645_654, // Rather arbitrary size }, }, - {header: "x-amz-checksum-crc32c", + { + header: "x-amz-checksum-crc32c", hasher: crc32.New(crc32.MakeTable(crc32.Castagnoli)), ChecksumCRC32C: "set", PO: minio.PutObjectOptions{ @@ -2501,7 +2504,8 @@ func testTrailingChecksums() { PartSize: 5 << 20, }, }, - {header: "x-amz-checksum-crc32c", + { + header: "x-amz-checksum-crc32c", hasher: crc32.New(crc32.MakeTable(crc32.Castagnoli)), ChecksumCRC32C: "set", PO: minio.PutObjectOptions{