From 18770435a114b93c44f0e022242488449cf4e889 Mon Sep 17 00:00:00 2001 From: ishland Date: Sat, 27 Feb 2021 17:43:49 +0800 Subject: [PATCH] Ensure all locks are available before real acquisition and task execution Closes https://github.com/YatopiaMC/C2ME-fabric/issues/2 --- .../common/threading/GlobalExecutors.java | 15 +++- .../threading/worldgen/ChunkStatusUtils.java | 12 ++-- .../c2me/common/util/AsyncCombinedLock.java | 70 +++++++++++++++++++ .../util/AsyncNamedLockDelegateAsyncLock.java | 29 ++++++++ 4 files changed, 121 insertions(+), 5 deletions(-) create mode 100644 src/main/java/org/yatopiamc/c2me/common/util/AsyncCombinedLock.java create mode 100644 src/main/java/org/yatopiamc/c2me/common/util/AsyncNamedLockDelegateAsyncLock.java diff --git a/src/main/java/org/yatopiamc/c2me/common/threading/GlobalExecutors.java b/src/main/java/org/yatopiamc/c2me/common/threading/GlobalExecutors.java index 53a26b2c..7929455b 100644 --- a/src/main/java/org/yatopiamc/c2me/common/threading/GlobalExecutors.java +++ b/src/main/java/org/yatopiamc/c2me/common/threading/GlobalExecutors.java @@ -1,14 +1,27 @@ package org.yatopiamc.c2me.common.threading; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.jetbrains.annotations.NotNull; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicReference; public class GlobalExecutors { public static final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor( 1, - new ThreadFactoryBuilder().setNameFormat("C2ME scheduler").setDaemon(true).setPriority(Thread.NORM_PRIORITY - 1).build() + new ThreadFactoryBuilder().setNameFormat("C2ME scheduler").setDaemon(true).setPriority(Thread.NORM_PRIORITY - 1).setThreadFactory(r -> { + final Thread thread = new Thread(r); + GlobalExecutors.schedulerThread.set(thread); + return thread; + }).build() ); + private static final AtomicReference schedulerThread = new AtomicReference<>(); + + public static void ensureSchedulerThread() { + if (Thread.currentThread() != schedulerThread.get()) + throw new IllegalStateException("Not on scheduler thread"); + } } diff --git a/src/main/java/org/yatopiamc/c2me/common/threading/worldgen/ChunkStatusUtils.java b/src/main/java/org/yatopiamc/c2me/common/threading/worldgen/ChunkStatusUtils.java index fa6c5d96..5d914459 100644 --- a/src/main/java/org/yatopiamc/c2me/common/threading/worldgen/ChunkStatusUtils.java +++ b/src/main/java/org/yatopiamc/c2me/common/threading/worldgen/ChunkStatusUtils.java @@ -7,12 +7,16 @@ import net.minecraft.world.chunk.ChunkStatus; import org.jetbrains.annotations.NotNull; import org.yatopiamc.c2me.common.threading.GlobalExecutors; +import org.yatopiamc.c2me.common.util.AsyncCombinedLock; +import org.yatopiamc.c2me.common.util.AsyncNamedLockDelegateAsyncLock; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Supplier; +import java.util.stream.Collectors; public class ChunkStatusUtils { @@ -36,14 +40,14 @@ public static ChunkStatusThreadingType getThreadingType(final ChunkStatus status } public static CompletableFuture runChunkGenWithLock(ChunkPos target, int radius, AsyncNamedLock chunkLock, Supplier> action) { - List> acquiredLocks = new ArrayList<>((radius + 1) * (radius + 1)); + List acquiredLocks = new ArrayList<>((radius + 1) * (radius + 1)); for (int x = target.x - radius; x <= target.x + radius; x++) for (int z = target.z - radius; z <= target.z + radius; z++) - acquiredLocks.add(chunkLock.acquireLock(new ChunkPos(x, z))); + acquiredLocks.add(new AsyncNamedLockDelegateAsyncLock<>(chunkLock, new ChunkPos(x, z))); - return Combinators.collect(acquiredLocks).toCompletableFuture().thenComposeAsync(lockTokens -> { + return new AsyncCombinedLock(new HashSet<>(acquiredLocks)).getFuture().thenComposeAsync(lockToken -> { final CompletableFuture future = action.get(); - future.thenRun(() -> lockTokens.forEach(AsyncLock.LockToken::releaseLock)); + future.thenRun(lockToken::releaseLock); return future; }, GlobalExecutors.scheduler); } diff --git a/src/main/java/org/yatopiamc/c2me/common/util/AsyncCombinedLock.java b/src/main/java/org/yatopiamc/c2me/common/util/AsyncCombinedLock.java new file mode 100644 index 00000000..c5487d08 --- /dev/null +++ b/src/main/java/org/yatopiamc/c2me/common/util/AsyncCombinedLock.java @@ -0,0 +1,70 @@ +package org.yatopiamc.c2me.common.util; + +import com.ibm.asyncutil.locks.AsyncLock; +import org.yatopiamc.c2me.common.threading.GlobalExecutors; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class AsyncCombinedLock { + + private final Set lockHandles; + private final CompletableFuture future = new CompletableFuture<>(); + + public AsyncCombinedLock(Set lockHandles) { + this.lockHandles = Set.copyOf(lockHandles); + GlobalExecutors.scheduler.execute(this::tryAcquire); + } + + private void tryAcquire() { + GlobalExecutors.ensureSchedulerThread(); + final Set tryLocks = lockHandles.stream().map(lock -> new LockEntry(lock, lock.tryLock())).collect(Collectors.toUnmodifiableSet()); + if (tryLocks.stream().allMatch(lockEntry -> lockEntry.lockToken.isPresent())) { + future.complete(new CombinedLockToken(tryLocks.stream().flatMap(lockEntry -> lockEntry.lockToken.stream()).collect(Collectors.toUnmodifiableSet()))); + } else { + tryLocks.stream().flatMap(lockEntry -> lockEntry.lockToken.stream()).forEach(AsyncLock.LockToken::releaseLock); + tryLocks.stream().unordered().filter(lockEntry -> lockEntry.lockToken.isEmpty()).findFirst().ifPresentOrElse(lockEntry -> + lockEntry.lock.acquireLock().thenCompose(lockToken -> { + lockToken.releaseLock(); + return CompletableFuture.runAsync(this::tryAcquire, GlobalExecutors.scheduler); + }), this::tryAcquire); + } + } + + public CompletableFuture getFuture() { + return future.thenApply(Function.identity()); + } + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + private static class LockEntry { + public final AsyncLock lock; + public final Optional lockToken; + + private LockEntry(AsyncLock lock, Optional lockToken) { + this.lock = lock; + this.lockToken = lockToken; + } + } + + private static class CombinedLockToken implements AsyncLock.LockToken { + + private final Set delegates; + + private CombinedLockToken(Set delegates) { + this.delegates = Set.copyOf(delegates); + } + + @Override + public void releaseLock() { + delegates.forEach(AsyncLock.LockToken::releaseLock); + } + + @Override + public void close() { + this.releaseLock(); + } + } +} diff --git a/src/main/java/org/yatopiamc/c2me/common/util/AsyncNamedLockDelegateAsyncLock.java b/src/main/java/org/yatopiamc/c2me/common/util/AsyncNamedLockDelegateAsyncLock.java new file mode 100644 index 00000000..be80a14b --- /dev/null +++ b/src/main/java/org/yatopiamc/c2me/common/util/AsyncNamedLockDelegateAsyncLock.java @@ -0,0 +1,29 @@ +package org.yatopiamc.c2me.common.util; + +import com.ibm.asyncutil.locks.AsyncLock; +import com.ibm.asyncutil.locks.AsyncNamedLock; + +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + +public class AsyncNamedLockDelegateAsyncLock implements AsyncLock { + + private final AsyncNamedLock delegate; + private final T name; + + public AsyncNamedLockDelegateAsyncLock(AsyncNamedLock delegate, T name) { + this.delegate = Objects.requireNonNull(delegate); + this.name = name; + } + + @Override + public CompletionStage acquireLock() { + return delegate.acquireLock(name); + } + + @Override + public Optional tryLock() { + return delegate.tryLock(name); + } +}