diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index 34d1213b55cdf1..4615397a5fee0b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -336,36 +336,12 @@ public ListenableFuture call() { } if (committedSize > lastCommittedOffset) { // We have made progress on this upload in the last request. Reset the backoff so - // that - // this request has a full deck of retries + // that this request has a full deck of retries progressiveBackoff.reset(); } } lastCommittedOffset = committedSize; - try { - chunker.seek(committedSize); - } catch (IOException e) { - try { - chunker.reset(); - } catch (IOException resetException) { - e.addSuppressed(resetException); - } - String tooManyOpenFilesError = "Too many open files"; - if (Ascii.toLowerCase(e.getMessage()) - .contains(Ascii.toLowerCase(tooManyOpenFilesError))) { - String newMessage = - "An IOException was thrown because the process opened too" - + " many files. We recommend setting" - + " --bep_maximum_open_remote_upload_files flag to a" - + " number lower than your system default (run 'ulimit" - + " -a' for *nix-based operating systems). Original" - + " error message: " - + e.getMessage(); - return Futures.immediateFailedFuture(new IOException(newMessage, e)); - } - return Futures.immediateFailedFuture(e); - } - return upload(); + return upload(committedSize); }, MoreExecutors.directExecutor()); } @@ -415,12 +391,14 @@ private ListenableFuture query() { MoreExecutors.directExecutor()); } - private ListenableFuture upload() { + private ListenableFuture upload(long pos) { return channel.withChannelFuture( channel -> { SettableFuture uploadResult = SettableFuture.create(); grpcContext.run( - () -> bsAsyncStub(channel).write(new Writer(resourceName, chunker, uploadResult))); + () -> + bsAsyncStub(channel) + .write(new Writer(resourceName, chunker, pos, uploadResult))); return uploadResult; }); } @@ -434,15 +412,18 @@ void cancel() { private static final class Writer implements ClientResponseObserver, Runnable { private final Chunker chunker; + private final long pos; private final String resourceName; private final SettableFuture uploadResult; private long committedSize = -1; private ClientCallStreamObserver requestObserver; private boolean first = true; - private Writer(String resourceName, Chunker chunker, SettableFuture uploadResult) { + private Writer( + String resourceName, Chunker chunker, long pos, SettableFuture uploadResult) { this.resourceName = resourceName; this.chunker = chunker; + this.pos = pos; this.uploadResult = uploadResult; } @@ -459,6 +440,15 @@ public void run() { return; } while (requestObserver.isReady()) { + WriteRequest.Builder request = WriteRequest.newBuilder(); + if (first) { + first = false; + if (!seekChunker()) { + return; + } + // Resource name only needs to be set on the first write for each file. + request.setResourceName(resourceName); + } Chunker.Chunk chunk; try { chunk = chunker.next(); @@ -467,17 +457,12 @@ public void run() { return; } boolean isLastChunk = !chunker.hasNext(); - WriteRequest.Builder request = - WriteRequest.newBuilder() + requestObserver.onNext( + request .setData(chunk.getData()) .setWriteOffset(chunk.getOffset()) - .setFinishWrite(isLastChunk); - if (first) { - first = false; - // Resource name only needs to be set on the first write for each file. - request.setResourceName(resourceName); - } - requestObserver.onNext(request.build()); + .setFinishWrite(isLastChunk) + .build()); if (isLastChunk) { requestObserver.onCompleted(); return; @@ -485,6 +470,32 @@ public void run() { } } + private boolean seekChunker() { + try { + chunker.seek(pos); + } catch (IOException e) { + try { + chunker.reset(); + } catch (IOException resetException) { + e.addSuppressed(resetException); + } + String tooManyOpenFilesError = "Too many open files"; + if (Ascii.toLowerCase(e.getMessage()).contains(Ascii.toLowerCase(tooManyOpenFilesError))) { + String newMessage = + "An IOException was thrown because the process opened too many files. We recommend" + + " setting --bep_maximum_open_remote_upload_files flag to a number lower than" + + " your system default (run 'ulimit -a' for *nix-based operating systems)." + + " Original error message: " + + e.getMessage(); + e = new IOException(newMessage, e); + } + uploadResult.setException(e); + requestObserver.cancel("failed to seek chunk", e); + return false; + } + return true; + } + @Override public void onNext(WriteResponse response) { committedSize = response.getCommittedSize(); diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index de2ff4d1ab44c4..ca3175d0a70326 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -30,6 +30,7 @@ import com.google.bytestream.ByteStreamProto.WriteRequest; import com.google.bytestream.ByteStreamProto.WriteResponse; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.hash.HashCode; @@ -77,7 +78,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import javax.annotation.Nullable; import org.junit.After; @@ -782,23 +782,18 @@ public void tooManyFilesIOException_adviseMaximumOpenFilesFlag() throws Exceptio byte[] blob = new byte[CHUNK_SIZE]; Chunker chunker = Mockito.mock(Chunker.class); Digest digest = DIGEST_UTIL.compute(blob); - AtomicLong committedOffset = new AtomicLong(0); - Mockito.doThrow(new IOException("Too many open files")) - .when(chunker) - .seek(committedOffset.get()); + Mockito.doThrow(new IOException("Too many open files")).when(chunker).seek(0); Mockito.when(chunker.getSize()).thenReturn(digest.getSizeBytes()); - - try { - uploader.uploadBlob(context, digest, chunker); - fail("Should have thrown an exception."); - } catch (IOException e) { - String newMessage = - "An IOException was thrown because the process opened too many files. We recommend" - + " setting --bep_maximum_open_remote_upload_files flag to a number lower than your" - + " system default (run 'ulimit -a' for *nix-based operating systems). Original error" - + " message: Too many open files"; - assertThat(newMessage).isEqualTo(e.getMessage()); - } + serviceRegistry.addService(new MaybeFailOnceUploadService(ImmutableMap.of())); + + String newMessage = + "An IOException was thrown because the process opened too many files. We recommend setting" + + " --bep_maximum_open_remote_upload_files flag to a number lower than your system" + + " default (run 'ulimit -a' for *nix-based operating systems). Original error message:" + + " Too many open files"; + assertThat(assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker))) + .hasMessageThat() + .isEqualTo(newMessage); } @Test