Skip to content

Commit

Permalink
fixes #205
Browse files Browse the repository at this point in the history
  • Loading branch information
infeo committed Feb 2, 2024
1 parent dae30ac commit 1170d22
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.cryptomator.cryptofs.ch;


import java.io.IOException;
import java.nio.channels.FileChannel;

@FunctionalInterface
public interface ChannelCloseListener {

void closed(CleartextFileChannel channel) throws IOException;
void closed(FileChannel ciphertextChannel);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.cryptomator.cryptofs.ch;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.cryptomator.cryptofs.CryptoFileSystemStats;
import org.cryptomator.cryptofs.EffectiveOpenOptions;
Expand Down Expand Up @@ -232,7 +233,8 @@ private void forceInternal(boolean metaData) throws IOException {
*
* @throws IOException
*/
private void flush() throws IOException {
@VisibleForTesting
void flush() throws IOException {
if (isWritable()) {
writeHeaderIfNeeded();
chunkCache.flush();
Expand All @@ -245,13 +247,14 @@ private void flush() throws IOException {
*
* @throws IOException
*/
private void persistLastModified() throws IOException {
@VisibleForTesting
void persistLastModified() throws IOException {
FileTime lastModifiedTime = isWritable() ? FileTime.from(lastModified.get()) : null;
FileTime lastAccessTime = FileTime.from(Instant.now());
var p = currentFilePath.get();
if (p != null) {
p.getFileSystem().provider()//
.getFileAttributeView(p, BasicFileAttributeView.class)
.getFileAttributeView(p, BasicFileAttributeView.class) //
.setTimes(lastModifiedTime, lastAccessTime, null);
}

Expand Down Expand Up @@ -321,7 +324,13 @@ long beginOfChunk(long cleartextPos) {
@Override
protected void implCloseChannel() throws IOException {
try {
flush();
try {
flush(); //flush cache content
} finally {
closeListener.closed(ciphertextFileChannel); //deregister channel from cache and possibly close file
}

ciphertextFileChannel.force(true); //flush this filechannel
try {
persistLastModified();
} catch (NoSuchFileException nsfe) {
Expand All @@ -331,8 +340,8 @@ protected void implCloseChannel() throws IOException {
LOG.warn("Failed to persist last modified timestamp for encrypted file: {}", e.getMessage());
}
} finally {
ciphertextFileChannel.close();
super.implCloseChannel();
closeListener.closed(this);
}
}
}
27 changes: 11 additions & 16 deletions src/main/java/org/cryptomator/cryptofs/fh/ChunkIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.nio.channels.FileChannel;
import java.nio.channels.NonReadableChannelException;
import java.nio.channels.NonWritableChannelException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -16,6 +15,8 @@ class ChunkIO {
private final Set<FileChannel> readableChannels = ConcurrentHashMap.newKeySet();
private final Set<FileChannel> writableChannels = ConcurrentHashMap.newKeySet();

private final PriorityMutex synchronizer = new PriorityMutex();

@Inject
public ChunkIO() {
}
Expand All @@ -39,34 +40,28 @@ public void registerChannel(FileChannel channel, boolean writable) {
* @param channel
*/
public void unregisterChannel(FileChannel channel) {
readableChannels.remove(channel);
writableChannels.remove(channel);
try (var token = synchronizer.dispensePriority()) {
readableChannels.remove(channel);
writableChannels.remove(channel);
}
}

int read(ByteBuffer dst, long position) throws IOException {
return getReadableChannel().read(dst, position);
}

int write(ByteBuffer src, long position) throws IOException {
return getWritableChannel().write(src, position);
try (var token = synchronizer.dispenseRegular()) {
return getWritableChannel().write(src, position);
}
}

private FileChannel getReadableChannel() {
Iterator<FileChannel> iter = readableChannels.iterator();
if (iter.hasNext()) {
return iter.next();
} else {
throw new NonReadableChannelException();
}
return readableChannels.stream().findAny().orElseThrow(NonReadableChannelException::new);
}

private FileChannel getWritableChannel() {
Iterator<FileChannel> iter = writableChannels.iterator();
if (iter.hasNext()) {
return iter.next();
} else {
throw new NonWritableChannelException();
}
return writableChannels.stream().findAny().orElseThrow(NonWritableChannelException::new);
}

}
28 changes: 11 additions & 17 deletions src/main/java/org/cryptomator/cryptofs/fh/OpenCryptoFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import java.nio.file.attribute.FileTime;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -42,7 +41,7 @@ public class OpenCryptoFile implements Closeable {
private final AtomicReference<Path> currentFilePath;
private final AtomicLong fileSize;
private final OpenCryptoFileComponent component;
private final ConcurrentMap<CleartextFileChannel, FileChannel> openChannels = new ConcurrentHashMap<>();
private final AtomicInteger openChannels = new AtomicInteger(0);

@Inject
public OpenCryptoFile(FileCloseListener listener, ChunkCache chunkCache, Cryptor cryptor, FileHeaderHolder headerHolder, ChunkIO chunkIO, @CurrentOpenFilePath AtomicReference<Path> currentFilePath, @OpenFileSize AtomicLong fileSize, @OpenFileModifiedDate AtomicReference<Instant> lastModified, OpenCryptoFileComponent component) {
Expand Down Expand Up @@ -87,14 +86,14 @@ public synchronized FileChannel newFileChannel(EffectiveOpenOptions options, Fil
if (cleartextFileChannel == null) { // i.e. something didn't work
closeQuietly(ciphertextFileChannel);
// is this the first file channel to be opened?
if (openChannels.isEmpty()) {
if (openChannels.get() == 0) {
close(); // then also close the file again.
}
}
}

assert cleartextFileChannel != null; // otherwise there would have been an exception
openChannels.put(cleartextFileChannel, ciphertextFileChannel);
openChannels.incrementAndGet();
chunkIO.registerChannel(ciphertextFileChannel, options.writable());
return cleartextFileChannel;
}
Expand Down Expand Up @@ -177,30 +176,25 @@ public Path getCurrentFilePath() {

/**
* Updates the current ciphertext file path, if it is not already set to null (i.e., the openCryptoFile is deleted)
*
* @param newFilePath new ciphertext path
*/
public void updateCurrentFilePath(Path newFilePath) {
currentFilePath.updateAndGet(p -> p == null ? null : newFilePath);
}

private synchronized void channelClosed(CleartextFileChannel cleartextFileChannel) throws IOException {
try {
FileChannel ciphertextFileChannel = openChannels.remove(cleartextFileChannel);
if (ciphertextFileChannel != null) {
chunkIO.unregisterChannel(ciphertextFileChannel);
ciphertextFileChannel.close();
}
} finally {
if (openChannels.isEmpty()) {
close();
}
private synchronized void channelClosed(FileChannel ciphertextFileChannel) {
int openChannels = this.openChannels.decrementAndGet();
chunkIO.unregisterChannel(ciphertextFileChannel);
if (openChannels == 0) {
close();
}
}

@Override
public void close() {
var p = currentFilePath.get();
if(p != null) {
if (p != null) {
listener.close(p, this);
}
}
Expand Down
108 changes: 108 additions & 0 deletions src/main/java/org/cryptomator/cryptofs/fh/PriorityMutex.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.cryptomator.cryptofs.fh;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

/**
* Mutex with two (reentrant) states.
* <p>
* The mutex hands out redeemable tokens, depending on its state.
* There are two type of tokens, regular and priority ones.
* In the regular state, the mutex hands out regular tokens without blocking.
* On the first priority request, the mutex switches to priority state:
* * dispensing new regular tokens is blocked
* * priority requests block until the last dispensed regular token is redeemed.
* * afterward, all priority requests are handled without blocking
* <p>
* If the last handed out priority token is redeemed, the mutex switches back to its regular state, unblocking all regular requests.
* <p>
* Based on <a href="https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/locks/LockSupport.html">an JDK example</a>
*/
public class PriorityMutex {


private final AtomicInteger counter = new AtomicInteger(0);
private final AtomicInteger handedOutRegularTokens = new AtomicInteger(0);
private final AtomicInteger priorityRequests = new AtomicInteger(0);
private final Map<Integer, Thread> regularLane = new ConcurrentHashMap<>();
private final Map<Integer, Thread> fastLane = new ConcurrentHashMap<>();

/**
* Waits until all priority token requests are handled and redeemed and hands out a regular token.
*/
public Token dispenseRegular() {
boolean wasInterrupted = false;
// publish current thread to regular unparking lane
var tokenId = counter.incrementAndGet();
regularLane.put(tokenId, Thread.currentThread());

//Block while there are not handled or redeemed priority requests/tokens
while (priorityRequests.get() != 0) {
LockSupport.park(this);
// ignore interrupts while waiting
if (Thread.interrupted()) wasInterrupted = true;
}
handedOutRegularTokens.incrementAndGet();
regularLane.remove(tokenId);
// ensure correct interrupt status on return
if (wasInterrupted) Thread.currentThread().interrupt();
return () -> {
handedOutRegularTokens.decrementAndGet();
redeem();
};
}

/**
* Waits until all handed-out regular tokens are redeemed and hands out a priority token.
*/
public Token dispensePriority() {
priorityRequests.incrementAndGet();
boolean wasInterrupted = false;
// publish current thread to priority unparking lane
var tokenId = counter.incrementAndGet();
fastLane.put(tokenId, Thread.currentThread());

//Block while there are not redeemed regular tokens
while (handedOutRegularTokens.get() != 0) {
LockSupport.park(this);
// ignore interrupts while waiting
if (Thread.interrupted()) wasInterrupted = true;
}

fastLane.remove(tokenId);
// ensure correct interrupt status on return
if (wasInterrupted) Thread.currentThread().interrupt();
return () -> {
priorityRequests.decrementAndGet();
redeem();
};
}

private void redeem() {
if (priorityRequests.get() != 0) {
fastLane.forEach((id, thread) -> LockSupport.unpark(thread));
} else {
regularLane.forEach((id, thread) -> LockSupport.unpark(thread));
}
}

static {
// Reduce the risk of "lost unpark" due to classloading
Class<?> ensureLoaded = LockSupport.class;
}

@FunctionalInterface
interface Token extends AutoCloseable {

void redeem();

default void close() {
redeem();
}
}

;

}
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,23 @@ public void testForceWithoutMetadataDoesntUpdatesLastModifiedTime() throws IOExc
public class Close {

@Test
public void testCloseTriggersCloseListener() throws IOException {
inTest.implCloseChannel();
@DisplayName("IOException during flush still cleans up")
public void testCloseIoExceptionFlush() throws IOException {
var inSpy = Mockito.spy(inTest);
Mockito.doThrow(IOException.class).when(inSpy).flush();
Assertions.assertThrows(IOException.class, () -> inSpy.implCloseChannel());
verify(closeListener).closed(ciphertextFileChannel);
verify(ciphertextFileChannel).close();
}

verify(closeListener).closed(inTest);
@Test
@DisplayName("IOException during ciphertextChannel.force() still cleans up")
public void testCloseIoExceptionForce() throws IOException {
var inSpy = Mockito.spy(inTest);
Mockito.doThrow(IOException.class).when(ciphertextFileChannel).force(Mockito.anyBoolean());
Assertions.assertThrows(IOException.class, () -> inSpy.implCloseChannel());
verify(closeListener).closed(ciphertextFileChannel);
verify(ciphertextFileChannel).close();
}

@Test
Expand All @@ -251,6 +264,19 @@ public void testCloseUpdatesLastModifiedTimeIfWriteable() throws IOException {
verify(attributeView).setTimes(Mockito.eq(fileTime), Mockito.any(), Mockito.isNull());
}

@Test
@DisplayName("IOException on persisting lastModified during close is ignored")
public void testCloseExceptionOnLastModifiedPersistenceIgnored() throws IOException {
when(options.writable()).thenReturn(true);
lastModified.set(Instant.ofEpochMilli(123456789000l));

var inSpy = Mockito.spy(inTest);
Mockito.doThrow(IOException.class).when(inSpy).persistLastModified();

Assertions.assertDoesNotThrow(() -> inSpy.implCloseChannel());

}

@Test
public void testCloseDoesNotUpdateLastModifiedTimeIfReadOnly() throws IOException {
when(options.writable()).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public void triggerCloseListener() throws IOException {
Assumptions.assumeTrue(listener.get() != null);
Assumptions.assumeTrue(ciphertextChannel.get() != null);

listener.get().closed(cleartextFileChannel);
listener.get().closed(ciphertextChannel.get());
verify(chunkIO).unregisterChannel(ciphertextChannel.get());
}

Expand Down

0 comments on commit 1170d22

Please sign in to comment.