Skip to content

Commit

Permalink
Remote: Prefetch input files into a temporary path first.
Browse files Browse the repository at this point in the history
When building with build without bytes and dynamic execution, we need prefetch input files for local actions. Sometimes, multiple local actions could share the same input files, so there could be a case where multiple call sites share the same download instance. If the local action is cancelled (due to remote branch wins), the download it requested should also be cancelled only if that download is not shared with other local action (or all the releated local actions are cancelled).

Before this change, the inputs are written to their final destination directly. This is fine if we can make sure no race or bug in the prefetcher. However, this is not true: #15010.

The root cause is, when cancelling the downloads, sometimes, the partially downloaded files on the disk are not deleted.

By making the prefetcher download input to a temporary path first, we can:
  1. Mitigate the race: only the final move step will potentially cause the race condition.
  2. Provide a way to observe the race: if these is no race, all temporary files should be either moved or deleted. But when running with this change, many temporary files exist.

Working towards #12454.

PiperOrigin-RevId: 447473693
  • Loading branch information
coeuvre authored and copybara-github committed May 9, 2022
1 parent 74fff55 commit 280ef69
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Function;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

/**
Expand All @@ -48,11 +51,13 @@ public abstract class AbstractActionInputPrefetcher implements ActionInputPrefet
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final AsyncTaskCache.NoResult<Path> downloadCache = AsyncTaskCache.NoResult.create();
private final TempPathGenerator tempPathGenerator;

protected final Path execRoot;

protected AbstractActionInputPrefetcher(Path execRoot) {
protected AbstractActionInputPrefetcher(Path execRoot, TempPathGenerator tempPathGenerator) {
this.execRoot = execRoot;
this.tempPathGenerator = tempPathGenerator;
}

protected abstract boolean shouldDownloadInput(
Expand Down Expand Up @@ -113,25 +118,41 @@ private Completable prefetchInput(MetadataProvider metadataProvider, ActionInput
return downloadFileIfNot(path, (p) -> downloadInput(p, input, metadata));
}

/** Downloads file into the {@code path} with given downloader. */
/**
* Downloads file into the {@code path} with given downloader.
*
* <p>The file will be written into a temporary file and moved to the final destination after the
* download finished.
*/
protected Completable downloadFileIfNot(
Path path, Function<Path, ListenableFuture<Void>> downloader) {
AtomicBoolean completed = new AtomicBoolean(false);
Completable download =
toCompletable(() -> downloader.apply(path), directExecutor())
.doOnComplete(() -> finalizeDownload(path))
.doOnError(error -> deletePartialDownload(path))
.doOnDispose(() -> deletePartialDownload(path));
Completable.using(
tempPathGenerator::generateTempPath,
tempPath ->
toCompletable(() -> downloader.apply(tempPath), directExecutor())
.doOnComplete(
() -> {
finalizeDownload(tempPath, path);
completed.set(true);
}),
tempPath -> {
if (!completed.get()) {
deletePartialDownload(tempPath);
}
},
// Set eager=false here because we want cleanup the download *after* upstream is
// disposed.
/* eager= */ false);
return downloadCache.executeIfNot(path, download);
}

private void finalizeDownload(Path path) {
try {
// The permission of output file is changed to 0555 after action execution. We manually change
// the permission here for the downloaded file to keep this behaviour consistent.
path.chmod(0555);
} catch (IOException e) {
logger.atWarning().withCause(e).log("Failed to chmod 555 on %s", path);
}
private void finalizeDownload(Path tmpPath, Path path) throws IOException {
// The permission of output file is changed to 0555 after action execution. We manually change
// the permission here for the downloaded file to keep this behaviour consistent.
tmpPath.chmod(0555);
FileSystemUtils.moveFile(tmpPath, path);
}

private void deletePartialDownload(Path path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.sandbox.SandboxHelpers;
import com.google.devtools.build.lib.vfs.Path;
Expand All @@ -49,8 +50,12 @@ class RemoteActionInputFetcher extends AbstractActionInputPrefetcher {
private final RemoteCache remoteCache;

RemoteActionInputFetcher(
String buildRequestId, String commandId, RemoteCache remoteCache, Path execRoot) {
super(execRoot);
String buildRequestId,
String commandId,
RemoteCache remoteCache,
Path execRoot,
TempPathGenerator tempPathGenerator) {
super(execRoot, tempPathGenerator);
this.buildRequestId = Preconditions.checkNotNull(buildRequestId);
this.commandId = Preconditions.checkNotNull(commandId);
this.remoteCache = Preconditions.checkNotNull(remoteCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.runtime.BlazeModule;
Expand Down Expand Up @@ -906,7 +907,8 @@ public void registerActionContexts(
}

@Override
public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorBuilder builder) {
public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorBuilder builder)
throws AbruptExitException {
Preconditions.checkState(actionInputFetcher == null, "actionInputFetcher must be null");
Preconditions.checkNotNull(remoteOptions, "remoteOptions must not be null");

Expand All @@ -918,12 +920,27 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB
env.getOptions().getOptions(RemoteOptions.class), "RemoteOptions");
RemoteOutputsMode remoteOutputsMode = remoteOptions.remoteOutputsMode;
if (!remoteOutputsMode.downloadAllOutputs() && actionContextProvider.getRemoteCache() != null) {
Path tempDir = env.getActionTempsDirectory().getChild("remote");
try {
if (tempDir.exists()
&& (!tempDir.isDirectory() || !tempDir.getDirectoryEntries().isEmpty())) {
env.getReporter()
.handle(Event.warn("Found incomplete downloads from previous build, deleting..."));
tempDir.deleteTree();
}
} catch (IOException e) {
throw createExitException(
e.getMessage(),
ExitCode.LOCAL_ENVIRONMENTAL_ERROR,
Code.DOWNLOADED_INPUTS_DELETION_FAILURE);
}
actionInputFetcher =
new RemoteActionInputFetcher(
env.getBuildRequestId(),
env.getCommandId().toString(),
actionContextProvider.getRemoteCache(),
env.getExecRoot());
env.getExecRoot(),
new TempPathGenerator(tempDir));
builder.setActionInputPrefetcher(actionInputFetcher);
remoteOutputService.setActionInputFetcher(actionInputFetcher);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2022 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.util;

import com.google.common.annotations.VisibleForTesting;
import com.google.devtools.build.lib.vfs.Path;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.ThreadSafe;

/** A generator that generate temporary path under a given directory. */
@ThreadSafe
public class TempPathGenerator {
private final Path tempDir;
private final AtomicInteger index = new AtomicInteger();

public TempPathGenerator(Path tempDir) {
this.tempDir = tempDir;
}

/** Generates a temporary path */
public Path generateTempPath() {
return tempDir.getChild(index.getAndIncrement() + ".tmp");
}

@VisibleForTesting
public Path getTempDir() {
return tempDir;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.Mockito.when;

import build.bazel.remote.execution.v2.Digest;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
Expand All @@ -43,6 +44,7 @@
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.InMemoryCacheClient;
import com.google.devtools.build.lib.remote.util.StaticMetadataProvider;
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
import com.google.devtools.build.lib.vfs.DigestHashFunction;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
Expand Down Expand Up @@ -70,6 +72,7 @@ public class RemoteActionInputFetcherTest {
private static final DigestHashFunction HASH_FUNCTION = DigestHashFunction.SHA256;

private Path execRoot;
private TempPathGenerator tempPathGenerator;
private ArtifactRoot artifactRoot;
private RemoteOptions options;
private DigestUtil digestUtil;
Expand All @@ -79,6 +82,9 @@ public void setUp() throws IOException {
FileSystem fs = new InMemoryFileSystem(new JavaClock(), HASH_FUNCTION);
execRoot = fs.getPath("/exec");
execRoot.createDirectoryAndParents();
Path tempDir = fs.getPath("/tmp");
tempDir.createDirectoryAndParents();
tempPathGenerator = new TempPathGenerator(tempDir);
Path dev = fs.getPath("/dev");
dev.createDirectory();
dev.setWritable(false);
Expand All @@ -98,7 +104,7 @@ public void testFetching() throws Exception {
MetadataProvider metadataProvider = new StaticMetadataProvider(metadata);
RemoteCache remoteCache = newCache(options, digestUtil, cacheEntries);
RemoteActionInputFetcher actionInputFetcher =
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);

// act
wait(actionInputFetcher.prefetchFiles(metadata.keySet(), metadataProvider));
Expand All @@ -121,7 +127,7 @@ public void testStagingVirtualActionInput() throws Exception {
MetadataProvider metadataProvider = new StaticMetadataProvider(new HashMap<>());
RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>());
RemoteActionInputFetcher actionInputFetcher =
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);
VirtualActionInput a = ActionsTestUtil.createVirtualActionInput("file1", "hello world");

// act
Expand All @@ -141,7 +147,7 @@ public void testStagingEmptyVirtualActionInput() throws Exception {
MetadataProvider metadataProvider = new StaticMetadataProvider(new HashMap<>());
RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>());
RemoteActionInputFetcher actionInputFetcher =
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);

// act
wait(
Expand All @@ -164,7 +170,7 @@ public void testFileNotFound() throws Exception {
MetadataProvider metadataProvider = new StaticMetadataProvider(metadata);
RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>());
RemoteActionInputFetcher actionInputFetcher =
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);

// act
assertThrows(
Expand All @@ -188,7 +194,7 @@ public void testIgnoreNoneRemoteFiles() throws Exception {
MetadataProvider metadataProvider = new StaticMetadataProvider(ImmutableMap.of(a, f));
RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>());
RemoteActionInputFetcher actionInputFetcher =
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);

// act
wait(actionInputFetcher.prefetchFiles(ImmutableList.of(a), metadataProvider));
Expand All @@ -206,7 +212,7 @@ public void testDownloadFile() throws Exception {
Artifact a1 = createRemoteArtifact("file1", "hello world", metadata, cacheEntries);
RemoteCache remoteCache = newCache(options, digestUtil, cacheEntries);
RemoteActionInputFetcher actionInputFetcher =
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);

// act
actionInputFetcher.downloadFile(a1.getPath(), metadata.get(a1));
Expand All @@ -227,23 +233,15 @@ public void testDownloadFile_onInterrupt_deletePartialDownloadedFile() throws Ex
Map<Digest, ByteString> cacheEntries = new HashMap<>();
Artifact a1 = createRemoteArtifact("file1", "hello world", metadata, cacheEntries);
RemoteCache remoteCache = mock(RemoteCache.class);
when(remoteCache.downloadFile(any(), any(), any()))
.thenAnswer(
invocation -> {
Path path = invocation.getArgument(1);
Digest digest = invocation.getArgument(2);
ByteString content = cacheEntries.get(digest);
if (content == null) {
return Futures.immediateFailedFuture(new IOException("Not found"));
}
content.writeTo(path.getOutputStream());

startSemaphore.release();
return SettableFuture
.create(); // A future that never complete so we can interrupt later
});
mockDownload(
remoteCache,
cacheEntries,
() -> {
startSemaphore.release();
return SettableFuture.create(); // A future that never complete so we can interrupt later
});
RemoteActionInputFetcher actionInputFetcher =
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);

AtomicBoolean interrupted = new AtomicBoolean(false);
Thread t =
Expand All @@ -265,6 +263,7 @@ public void testDownloadFile_onInterrupt_deletePartialDownloadedFile() throws Ex

assertThat(interrupted.get()).isTrue();
assertThat(a1.getPath().exists()).isFalse();
assertThat(tempPathGenerator.getTempDir().getDirectoryEntries()).isEmpty();
}

@Test
Expand All @@ -279,9 +278,9 @@ public void testPrefetchFiles_multipleThreads_downloadIsNotCancelledByOtherThrea
MetadataProvider metadataProvider = new StaticMetadataProvider(metadata);
SettableFuture<Void> download = SettableFuture.create();
RemoteCache remoteCache = mock(RemoteCache.class);
when(remoteCache.downloadFile(any(), any(), any())).thenAnswer(invocation -> download);
mockDownload(remoteCache, cacheEntries, () -> download);
RemoteActionInputFetcher actionInputFetcher =
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);
Thread cancelledThread =
new Thread(
() -> {
Expand Down Expand Up @@ -326,6 +325,8 @@ public void testPrefetchFiles_multipleThreads_downloadIsNotCancelledByOtherThrea

// assert
assertThat(successful.get()).isTrue();
assertThat(FileSystemUtils.readContent(artifact.getPath(), StandardCharsets.UTF_8))
.isEqualTo("hello world");
}

@Test
Expand All @@ -340,9 +341,9 @@ public void testPrefetchFiles_multipleThreads_downloadIsCancelled() throws Excep

SettableFuture<Void> download = SettableFuture.create();
RemoteCache remoteCache = mock(RemoteCache.class);
when(remoteCache.downloadFile(any(), any(), any())).thenAnswer(invocation -> download);
mockDownload(remoteCache, cacheEntries, () -> download);
RemoteActionInputFetcher actionInputFetcher =
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);

Thread cancelledThread1 =
new Thread(
Expand Down Expand Up @@ -376,6 +377,8 @@ public void testPrefetchFiles_multipleThreads_downloadIsCancelled() throws Excep

// assert
assertThat(download.isCancelled()).isTrue();
assertThat(artifact.getPath().exists()).isFalse();
assertThat(tempPathGenerator.getTempDir().getDirectoryEntries()).isEmpty();
}

private Artifact createRemoteArtifact(
Expand Down Expand Up @@ -420,4 +423,23 @@ private static void wait(ListenableFuture<Void> future) throws IOException, Inte
throw e;
}
}

private static void mockDownload(
RemoteCache remoteCache,
Map<Digest, ByteString> cacheEntries,
Supplier<ListenableFuture<Void>> resultSupplier)
throws IOException {
when(remoteCache.downloadFile(any(), any(), any()))
.thenAnswer(
invocation -> {
Path path = invocation.getArgument(1);
Digest digest = invocation.getArgument(2);
ByteString content = cacheEntries.get(digest);
if (content == null) {
return Futures.immediateFailedFuture(new IOException("Not found"));
}
FileSystemUtils.writeContent(path, content.toByteArray());
return resultSupplier.get();
});
}
}

0 comments on commit 280ef69

Please sign in to comment.