diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java index 6cb60e14984f9..6ebbe5bce582a 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java @@ -107,9 +107,7 @@ private boolean refillBufferIfNeeded() throws IOException { bufferOffsetEnd = endRange; long bytesRead = endRange - startRange + 1; int bytesToCopy = (int) bytesRead; - while (bytesToCopy > 0) { - bytesToCopy -= buffer.writeBytes(stream, bytesToCopy); - } + fillBuffer(stream, bytesToCopy); cursor += buffer.readableBytes(); } @@ -135,6 +133,20 @@ private boolean refillBufferIfNeeded() throws IOException { return true; } + void fillBuffer(InputStream is, int bytesToCopy) throws IOException { + while (bytesToCopy > 0) { + int writeBytes = buffer.writeBytes(is, bytesToCopy); + if (writeBytes < 0) { + break; + } + bytesToCopy -= writeBytes; + } + } + + ByteBuf getBuffer() { + return buffer; + } + @Override public int read() throws IOException { if (refillBufferIfNeeded()) { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java new file mode 100644 index 0000000000000..951180e4e18c8 --- /dev/null +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.offload.jcloud.impl; + +import static org.testng.Assert.assertEquals; + +import java.io.IOException; +import java.io.InputStream; +import org.apache.bookkeeper.mledger.offload.jcloud.BlobStoreTestBase; +import org.testng.annotations.Test; + +public class BlobStoreBackedInputStreamTest extends BlobStoreTestBase { + + @Test + public void testFillBuffer() throws Exception { + BlobStoreBackedInputStreamImpl bis = new BlobStoreBackedInputStreamImpl( + blobStore, BUCKET, "testFillBuffer", (k, md) -> { + }, 2048, 512); + + InputStream is = new InputStream() { + int count = 10; + + @Override + public int read() throws IOException { + if (count-- > 0) { + return 1; + } else { + return -1; + } + } + }; + bis.fillBuffer(is, 20); + assertEquals(bis.getBuffer().readableBytes(), 10); + } +}