Skip to content

Commit

Permalink
Rewrite ProtoSourceResolver to use FutureTask API
Browse files Browse the repository at this point in the history
  • Loading branch information
ascopes committed Feb 17, 2024
1 parent 304620f commit 530b698
Showing 1 changed file with 62 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,24 @@
*/
package io.github.ascopes.protobufmavenplugin.source;

import static java.util.function.Predicate.not;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toSet;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Named;
Expand Down Expand Up @@ -56,7 +62,7 @@ public final class ProtoSourceResolver implements AutoCloseable {

@Inject
public ProtoSourceResolver(ProtoArchiveExtractor protoArchiveExtractor) {
var concurrency = Runtime.getRuntime().availableProcessors() * 4;
var concurrency = Runtime.getRuntime().availableProcessors() * 8;

this.protoArchiveExtractor = protoArchiveExtractor;
executorService = Executors.newWorkStealingPool(concurrency);
Expand All @@ -74,37 +80,6 @@ public void close() {
}
}

public Collection<ProtoFileListing> createProtoFileListings(
Collection<Path> originalPaths
) throws IOException {
var futures = new ArrayList<CompletableFuture<Optional<ProtoFileListing>>>();

for (var originalPath : originalPaths) {
futures.add(createProtoFileListingAsync(originalPath));
}

var results = new ArrayList<ProtoFileListing>();
var exceptions = new ArrayList<Exception>();

for (var future : futures) {
try {
future.get().ifPresent(results::add);
} catch (ExecutionException | InterruptedException ex) {
exceptions.add(ex);
}
}

if (!exceptions.isEmpty()) {
var causeIterator = exceptions.iterator();
var ex = new IOException("Failed to create listings asynchronously");
ex.initCause(causeIterator.next());
causeIterator.forEachRemaining(ex::addSuppressed);
throw ex;
}

return results;
}

public Optional<ProtoFileListing> createProtoFileListing(Path path) throws IOException {
if (!Files.exists(path)) {
log.debug("Skipping lookup in path {} as it does not exist", path);
Expand All @@ -116,37 +91,69 @@ public Optional<ProtoFileListing> createProtoFileListing(Path path) throws IOExc
}

try (var stream = Files.walk(path)) {
var protoFiles = stream
return stream
.filter(ProtoFilePredicates::isProtoFile)
.peek(protoFile -> log.debug("Found proto file in root {}: {}", path, protoFile))
.collect(Collectors.toUnmodifiableSet());
.collect(collectingAndThen(toSet(), Optional::of))
.filter(not(Set::isEmpty))
.map(toListingForRootPath(path));
}
}

if (protoFiles.isEmpty()) {
return Optional.empty();
}
public Collection<ProtoFileListing> createProtoFileListings(
Collection<Path> originalPaths
) throws IOException {
var results = new ArrayList<Optional<ProtoFileListing>>();
var exceptions = new ArrayList<Exception>();

var listing = ImmutableProtoFileListing
.builder()
.addAllProtoFiles(protoFiles)
.protoFilesRoot(path)
.originalRoot(path)
.build();
originalPaths
.stream()
.map(this::submitProtoFileListingTask)
.collect(toSet()) // terminal operation to ensure all are scheduled prior to joining.
.stream()
.forEach(task -> partitionResultsForTasks(results, exceptions));

return Optional.of(listing);
if (!exceptions.isEmpty()) {
var causeIterator = exceptions.iterator();
var ex = new IOException("Failed to discover protobuf sources in some locations");
ex.initCause(causeIterator.next());
causeIterator.forEachRemaining(ex::addSuppressed);
throw ex;
}

return results
.stream()
.flatMap(Optional::stream)
.collect(toSet());
}

private Function<Set<Path>, ProtoFileListing> toListingForRootPath(Path rootPath) {
return protoFiles -> ImmutableProtoFileListing
.builder()
.addAllProtoFiles(protoFiles)
.protoFilesRoot(rootPath)
.originalRoot(rootPath)
.build();
}

private FutureTask<Optional<ProtoFileListing>> submitProtoFileListingTask(Path path) {
log.debug("Searching for proto files in '{}' asynchronously...", path);
var task = new FutureTask<>(() -> createProtoFileListing(path));
executorService.submit(task);
return task;
}

private CompletableFuture<Optional<ProtoFileListing>> createProtoFileListingAsync(
Path originalPath
// Generics are used to keep the signature sensible.
private <T> Consumer<FutureTask<T>> partitionResultsForTasks(
Collection<T> results,
Collection<Exception> exceptions
) {
var completableFuture = new CompletableFuture<Optional<ProtoFileListing>>();
executorService.submit(() -> {
return task -> {
try {
completableFuture.complete(createProtoFileListing(originalPath));
} catch (Exception ex) {
completableFuture.completeExceptionally(ex);
results.add(task.get());
} catch (ExecutionException | InterruptedException ex) {
exceptions.add(ex);
}
});
return completableFuture;
};
}
}

0 comments on commit 530b698

Please sign in to comment.