From ed47cf25ce252d7f3efbab4dcec590fb31ff1d27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 3 Nov 2021 13:01:35 +0000 Subject: [PATCH] [chunkwriting] Return original buffer to the pool When the buffer isn't filled to capacity a new slice is created that's smaller than the original. In that case the smaller slice is returned to the pool which prevents the rest of the capacity from being used again. The solution is to pass the original slice through and attach the length. This allows the original slice to be returned to the pool once the operation is complete. This change also simplifies the `sendChunk` method, ensuring that the buffer is returned to the TransferManager even when no bytes were read from the reader. --- sdk/storage/azblob/chunkwriting.go | 35 +++++++++++++----------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/sdk/storage/azblob/chunkwriting.go b/sdk/storage/azblob/chunkwriting.go index 2e13640dd5d3..02cd196f8700 100644 --- a/sdk/storage/azblob/chunkwriting.go +++ b/sdk/storage/azblob/chunkwriting.go @@ -112,6 +112,7 @@ type copier struct { type copierChunk struct { buffer []byte id string + length int } // getErr returns an error by priority. First, if a function set an error, it returns that error. Next, if the Context has an error @@ -138,37 +139,31 @@ func (c *copier) sendChunk() error { } n, err := io.ReadFull(c.reader, buffer) - switch { - case err == nil && n == 0: - return nil - case err == nil: + if n > 0 { + // Some data was read, schedule the write. id := c.id.next() c.wg.Add(1) c.o.TransferManager.Run( func() { defer c.wg.Done() - c.write(copierChunk{buffer: buffer[0:n], id: id}) + c.write(copierChunk{buffer: buffer, id: id, length: n}) }, ) - return nil - case err != nil && (err == io.EOF || err == io.ErrUnexpectedEOF) && n == 0: - return io.EOF + } else { + // Return the unused buffer to the manager. + c.o.TransferManager.Put(buffer) } - if err == io.EOF || err == io.ErrUnexpectedEOF { - id := c.id.next() - c.wg.Add(1) - c.o.TransferManager.Run( - func() { - defer c.wg.Done() - c.write(copierChunk{buffer: buffer[0:n], id: id}) - }, - ) + if err == nil { + return nil + } else if err == io.EOF || err == io.ErrUnexpectedEOF { return io.EOF } - if err := c.getErr(); err != nil { - return err + + if cerr := c.getErr(); cerr != nil { + return cerr } + return err } @@ -180,7 +175,7 @@ func (c *copier) write(chunk copierChunk) { return } stageBlockOptions := c.o.getStageBlockOptions() - _, err := c.to.StageBlock(c.ctx, chunk.id, internal.NopCloser(bytes.NewReader(chunk.buffer)), stageBlockOptions) + _, err := c.to.StageBlock(c.ctx, chunk.id, internal.NopCloser(bytes.NewReader(chunk.buffer[:chunk.length])), stageBlockOptions) if err != nil { c.errCh <- fmt.Errorf("write error: %w", err) return