Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force synchronous upload and reuse of possibly modified spawn outputs #23382

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,14 @@ default String getProgressMessage(RepositoryMapping mainRepositoryMapping) {
default boolean mayInsensitivelyPropagateInputs() {
return false;
}

/**
* Returns true if the action may modify spawn outputs after the spawn has executed.
*
* <p>If this returns true, any kind of spawn output caching or reuse needs to happen
* synchronously directly after the spawn execution.
*/
default boolean mayModifySpawnOutputsAfterExecution() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,16 @@ private static ImmutableSet<Artifact> nonNullAsSet(Artifact... artifacts) {
this.isExecutedOnWindows = isExecutedOnWindows;
}

@Override
public boolean mayModifySpawnOutputsAfterExecution() {
// Test actions modify test spawn outputs after execution:
// - if there are multiple attempts (unavoidable);
// - in all cases due to appending any stray stderr output to the test log in
// StandaloneTestStrategy.
// TODO: Get rid of the second case and only return true if there are multiple attempts.
return true;
}

public boolean isExecutedOnWindows() {
return isExecutedOnWindows;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,14 @@ public InMemoryOutput downloadOutputs(RemoteAction action, RemoteActionResult re
public static final class LocalExecution {
private final RemoteAction action;
private final SettableFuture<SpawnResult> spawnResultFuture;
private final Phaser spawnResultConsumers =
new Phaser(1) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
// We only use a single phase.
return true;
}
};

private LocalExecution(RemoteAction action) {
this.action = action;
Expand All @@ -1356,6 +1364,37 @@ public static LocalExecution createIfDeduplicatable(RemoteAction action) {
return new LocalExecution(action);
}

/**
* Attempts to register a thread waiting for the {@link #spawnResultFuture} to become available
* and returns true if successful.
*
* <p>Every call to this method must be matched by a call to {@link #unregister()} via
* try-finally.
*
* <p>This always returns true for actions that do not modify their spawns' outputs after
* execution.
*/
public boolean registerForOutputReuse() {
// We only use a single phase.
return spawnResultConsumers.register() == 0;
}

/**
* Unregisters a thread waiting for the {@link #spawnResultFuture}, either after successful
* reuse of the outputs or upon failure.
*/
public void unregister() {
spawnResultConsumers.arriveAndDeregister();
}

/**
* Waits for all potential consumers of the {@link #spawnResultFuture} to be done with their
* output reuse.
*/
public void awaitAllOutputReuse() {
spawnResultConsumers.arriveAndAwaitAdvance();
}

/**
* Signals to all potential consumers of the {@link #spawnResultFuture} that this execution has
* been cancelled and that the result will not be available.
Expand Down Expand Up @@ -1571,7 +1610,8 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult, Runnable
SpawnResult.Status.SUCCESS.equals(spawnResult.status()) && spawnResult.exitCode() == 0,
"shouldn't upload outputs of failed local action");

if (remoteOptions.remoteCacheAsync) {
if (remoteOptions.remoteCacheAsync
&& !action.getSpawn().getResourceOwner().mayModifySpawnOutputsAfterExecution()) {
Single.using(
remoteCache::retain,
remoteCache ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,89 +109,110 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
// results haven't been uploaded to the cache yet and deduplicate all of them against the
// first one.
LocalExecution previousExecution = null;
thisExecution = LocalExecution.createIfDeduplicatable(action);
if (shouldUploadLocalResults && thisExecution != null) {
previousExecution = inFlightExecutions.putIfAbsent(action.getActionKey(), thisExecution);
}
// Metadata will be available in context.current() until we detach.
// This is done via a thread-local variable.
try {
RemoteActionResult result;
try (SilentCloseable c = prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) {
result = remoteExecutionService.lookupCache(action);
}
// In case the remote cache returned a failed action (exit code != 0) we treat it as a
// cache miss
if (result != null && result.getExitCode() == 0) {
Stopwatch fetchTime = Stopwatch.createStarted();
InMemoryOutput inMemoryOutput;
try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "download outputs")) {
inMemoryOutput = remoteExecutionService.downloadOutputs(action, result);
}
fetchTime.stop();
totalTime.stop();
spawnMetrics
.setFetchTimeInMs((int) fetchTime.elapsed().toMillis())
.setTotalTimeInMs((int) totalTime.elapsed().toMillis())
.setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis());
SpawnResult spawnResult =
createSpawnResult(
digestUtil,
thisExecution = LocalExecution.createIfDeduplicatable(action);
if (shouldUploadLocalResults && thisExecution != null) {
LocalExecution previousOrThisExecution =
inFlightExecutions.merge(
tjgq marked this conversation as resolved.
Show resolved Hide resolved
action.getActionKey(),
result.getExitCode(),
/* cacheHit= */ true,
result.cacheName(),
inMemoryOutput,
result.getExecutionMetadata().getExecutionStartTimestamp(),
result.getExecutionMetadata().getExecutionCompletedTimestamp(),
spawnMetrics.build(),
spawn.getMnemonic());
return SpawnCache.success(spawnResult);
thisExecution,
(existingExecution, thisExecutionArg) -> {
if (existingExecution.registerForOutputReuse()) {
return existingExecution;
} else {
// The existing execution has completed and its results may have already
// been modified by its action, so we can't deduplicate against it. Instead,
// start a new in-flight execution.
return thisExecutionArg;
}
});
previousExecution =
previousOrThisExecution == thisExecution ? null : previousOrThisExecution;
}
} catch (CacheNotFoundException e) {
// Intentionally left blank
} catch (IOException e) {
if (BulkTransferException.allCausedByCacheNotFoundException(e)) {
try {
RemoteActionResult result;
try (SilentCloseable c =
prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) {
result = remoteExecutionService.lookupCache(action);
}
// In case the remote cache returned a failed action (exit code != 0) we treat it as a
// cache miss
if (result != null && result.getExitCode() == 0) {
Stopwatch fetchTime = Stopwatch.createStarted();
InMemoryOutput inMemoryOutput;
try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "download outputs")) {
inMemoryOutput = remoteExecutionService.downloadOutputs(action, result);
}
fetchTime.stop();
totalTime.stop();
spawnMetrics
.setFetchTimeInMs((int) fetchTime.elapsed().toMillis())
.setTotalTimeInMs((int) totalTime.elapsed().toMillis())
.setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis());
SpawnResult spawnResult =
createSpawnResult(
digestUtil,
action.getActionKey(),
result.getExitCode(),
/* cacheHit= */ true,
result.cacheName(),
inMemoryOutput,
result.getExecutionMetadata().getExecutionStartTimestamp(),
result.getExecutionMetadata().getExecutionCompletedTimestamp(),
spawnMetrics.build(),
spawn.getMnemonic());
return SpawnCache.success(spawnResult);
}
} catch (CacheNotFoundException e) {
// Intentionally left blank
} else {
String errorMessage = Utils.grpcAwareErrorMessage(e, verboseFailures);
if (isNullOrEmpty(errorMessage)) {
errorMessage = e.getClass().getSimpleName();
} catch (IOException e) {
if (BulkTransferException.allCausedByCacheNotFoundException(e)) {
// Intentionally left blank
} else {
String errorMessage = Utils.grpcAwareErrorMessage(e, verboseFailures);
if (isNullOrEmpty(errorMessage)) {
errorMessage = e.getClass().getSimpleName();
}
errorMessage = "Remote Cache: " + errorMessage;
remoteExecutionService.report(Event.warn(errorMessage));
}
errorMessage = "Remote Cache: " + errorMessage;
remoteExecutionService.report(Event.warn(errorMessage));
}
}
if (previousExecution != null) {
Stopwatch fetchTime = Stopwatch.createStarted();
SpawnResult previousResult;
try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "reuse outputs")) {
previousResult = remoteExecutionService.waitForAndReuseOutputs(action, previousExecution);
}
if (previousResult != null) {
spawnMetrics
.setFetchTimeInMs((int) fetchTime.elapsed().toMillis())
.setTotalTimeInMs((int) totalTime.elapsed().toMillis())
.setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis());
SpawnMetrics buildMetrics = spawnMetrics.build();
return SpawnCache.success(
new SpawnResult.DelegateSpawnResult(previousResult) {
@Override
public String getRunnerName() {
return "deduplicated";
}
if (previousExecution != null) {
Stopwatch fetchTime = Stopwatch.createStarted();
SpawnResult previousResult;
try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "reuse outputs")) {
previousResult =
remoteExecutionService.waitForAndReuseOutputs(action, previousExecution);
}
if (previousResult != null) {
spawnMetrics
.setFetchTimeInMs((int) fetchTime.elapsed().toMillis())
.setTotalTimeInMs((int) totalTime.elapsed().toMillis())
.setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis());
SpawnMetrics buildMetrics = spawnMetrics.build();
return SpawnCache.success(
new SpawnResult.DelegateSpawnResult(previousResult) {
@Override
public String getRunnerName() {
return "deduplicated";
}

@Override
public SpawnMetrics getMetrics() {
return buildMetrics;
}
});
@Override
public SpawnMetrics getMetrics() {
return buildMetrics;
}
});
}
// If we reach here, the previous execution was not successful (it encountered an
// exception or the spawn had an exit code != 0). Since it isn't possible to accurately
// recreate the failure without rerunning the action, we fall back to running the action
// locally. This means that we have introduced an unnecessary wait, but that can only
// happen in the case of a failing build with --keep_going.
}
} finally {
if (previousExecution != null) {
previousExecution.unregister();
}
// If we reach here, the previous execution was not successful (it encountered an exception
// or the spawn had an exit code != 0). Since it isn't possible to accurately recreate the
// failure without rerunning the action, we fall back to running the action locally. This
// means that we have introduced an unnecessary wait, but that can only happen in the case
// of a failing build with --keep_going.
}
}

Expand Down Expand Up @@ -239,6 +260,17 @@ public void store(SpawnResult result) throws ExecException, InterruptedException
// large.
remoteExecutionService.uploadOutputs(
action, result, () -> inFlightExecutions.remove(action.getActionKey()));
if (thisExecutionFinal != null
&& action.getSpawn().getResourceOwner().mayModifySpawnOutputsAfterExecution()) {
// In this case outputs have been uploaded synchronously and the callback above has run,
// so no new executions will be deduplicated against this one. We can safely await all
// existing executions finish the reuse.
// Note that while this call itself isn't interruptible, all operations it awaits are
// interruptible.
try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "await output reuse")) {
thisExecutionFinal.awaitAllOutputReuse();
}
}
}

private void checkForConcurrentModifications()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,14 @@ public NestedSet<Artifact> getPossibleInputsForTesting() {
return null;
}

@Override
public boolean mayModifySpawnOutputsAfterExecution() {
// Causes of spawn output modification after execution:
// - Fallback to the full classpath with --experimental_java_classpath=bazel.
// - In-place rewriting of .jdeps files with --experimental_output_paths=strip.
return true;
}

/**
* Locally rewrites a .jdeps file to replace missing config prefixes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ protected void afterExecute(
}
}

@Override
public boolean mayModifySpawnOutputsAfterExecution() {
// Causes of spawn output modification after execution:
// - In-place rewriting of .jdeps files with --experimental_output_paths=strip.
// TODO: Use separate files as action and spawn output to avoid in-place modification.
return true;
}

public static Builder newBuilder(RuleContext ruleContext) {
return new Builder(ruleContext);
}
Expand Down
Loading
Loading