Skip to content

Commit

Permalink
Ensure all locks are available before real acquisition and task execu…
Browse files Browse the repository at this point in the history
…tion

Closes #2
  • Loading branch information
ishland committed Feb 27, 2021
1 parent 9611439 commit 1877043
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
package org.yatopiamc.c2me.common.threading;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;

public class GlobalExecutors {

public static final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(
1,
new ThreadFactoryBuilder().setNameFormat("C2ME scheduler").setDaemon(true).setPriority(Thread.NORM_PRIORITY - 1).build()
new ThreadFactoryBuilder().setNameFormat("C2ME scheduler").setDaemon(true).setPriority(Thread.NORM_PRIORITY - 1).setThreadFactory(r -> {
final Thread thread = new Thread(r);
GlobalExecutors.schedulerThread.set(thread);
return thread;
}).build()
);
private static final AtomicReference<Thread> schedulerThread = new AtomicReference<>();

public static void ensureSchedulerThread() {
if (Thread.currentThread() != schedulerThread.get())
throw new IllegalStateException("Not on scheduler thread");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
import net.minecraft.world.chunk.ChunkStatus;
import org.jetbrains.annotations.NotNull;
import org.yatopiamc.c2me.common.threading.GlobalExecutors;
import org.yatopiamc.c2me.common.util.AsyncCombinedLock;
import org.yatopiamc.c2me.common.util.AsyncNamedLockDelegateAsyncLock;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class ChunkStatusUtils {

Expand All @@ -36,14 +40,14 @@ public static ChunkStatusThreadingType getThreadingType(final ChunkStatus status
}

public static <T> CompletableFuture<T> runChunkGenWithLock(ChunkPos target, int radius, AsyncNamedLock<ChunkPos> chunkLock, Supplier<CompletableFuture<T>> action) {
List<CompletionStage<AsyncLock.LockToken>> acquiredLocks = new ArrayList<>((radius + 1) * (radius + 1));
List<AsyncLock> acquiredLocks = new ArrayList<>((radius + 1) * (radius + 1));
for (int x = target.x - radius; x <= target.x + radius; x++)
for (int z = target.z - radius; z <= target.z + radius; z++)
acquiredLocks.add(chunkLock.acquireLock(new ChunkPos(x, z)));
acquiredLocks.add(new AsyncNamedLockDelegateAsyncLock<>(chunkLock, new ChunkPos(x, z)));

return Combinators.collect(acquiredLocks).toCompletableFuture().thenComposeAsync(lockTokens -> {
return new AsyncCombinedLock(new HashSet<>(acquiredLocks)).getFuture().thenComposeAsync(lockToken -> {
final CompletableFuture<T> future = action.get();
future.thenRun(() -> lockTokens.forEach(AsyncLock.LockToken::releaseLock));
future.thenRun(lockToken::releaseLock);
return future;
}, GlobalExecutors.scheduler);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.yatopiamc.c2me.common.util;

import com.ibm.asyncutil.locks.AsyncLock;
import org.yatopiamc.c2me.common.threading.GlobalExecutors;

import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;

public class AsyncCombinedLock {

private final Set<AsyncLock> lockHandles;
private final CompletableFuture<AsyncLock.LockToken> future = new CompletableFuture<>();

public AsyncCombinedLock(Set<AsyncLock> lockHandles) {
this.lockHandles = Set.copyOf(lockHandles);
GlobalExecutors.scheduler.execute(this::tryAcquire);
}

private void tryAcquire() {
GlobalExecutors.ensureSchedulerThread();
final Set<LockEntry> tryLocks = lockHandles.stream().map(lock -> new LockEntry(lock, lock.tryLock())).collect(Collectors.toUnmodifiableSet());
if (tryLocks.stream().allMatch(lockEntry -> lockEntry.lockToken.isPresent())) {
future.complete(new CombinedLockToken(tryLocks.stream().flatMap(lockEntry -> lockEntry.lockToken.stream()).collect(Collectors.toUnmodifiableSet())));
} else {
tryLocks.stream().flatMap(lockEntry -> lockEntry.lockToken.stream()).forEach(AsyncLock.LockToken::releaseLock);
tryLocks.stream().unordered().filter(lockEntry -> lockEntry.lockToken.isEmpty()).findFirst().ifPresentOrElse(lockEntry ->
lockEntry.lock.acquireLock().thenCompose(lockToken -> {
lockToken.releaseLock();
return CompletableFuture.runAsync(this::tryAcquire, GlobalExecutors.scheduler);
}), this::tryAcquire);
}
}

public CompletableFuture<AsyncLock.LockToken> getFuture() {
return future.thenApply(Function.identity());
}

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private static class LockEntry {
public final AsyncLock lock;
public final Optional<AsyncLock.LockToken> lockToken;

private LockEntry(AsyncLock lock, Optional<AsyncLock.LockToken> lockToken) {
this.lock = lock;
this.lockToken = lockToken;
}
}

private static class CombinedLockToken implements AsyncLock.LockToken {

private final Set<AsyncLock.LockToken> delegates;

private CombinedLockToken(Set<AsyncLock.LockToken> delegates) {
this.delegates = Set.copyOf(delegates);
}

@Override
public void releaseLock() {
delegates.forEach(AsyncLock.LockToken::releaseLock);
}

@Override
public void close() {
this.releaseLock();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.yatopiamc.c2me.common.util;

import com.ibm.asyncutil.locks.AsyncLock;
import com.ibm.asyncutil.locks.AsyncNamedLock;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;

public class AsyncNamedLockDelegateAsyncLock<T> implements AsyncLock {

private final AsyncNamedLock<T> delegate;
private final T name;

public AsyncNamedLockDelegateAsyncLock(AsyncNamedLock<T> delegate, T name) {
this.delegate = Objects.requireNonNull(delegate);
this.name = name;
}

@Override
public CompletionStage<LockToken> acquireLock() {
return delegate.acquireLock(name);
}

@Override
public Optional<LockToken> tryLock() {
return delegate.tryLock(name);
}
}

0 comments on commit 1877043

Please sign in to comment.